twalthr commented on a change in pull request #12688:
URL: https://github.com/apache/flink/pull/12688#discussion_r441519689
##########
File path:
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
##########
@@ -135,7 +126,7 @@ public void testTableSourceFullScan() throws Exception {
" h.family3.col3 " +
"FROM hTable AS h");
Review comment:
For all locations that were updated in the previous commit the declared
`throws Exception` is not necessary anymore. My IDE shows a lot of warnings
`Exception 'java.lang.Exception' is never thrown in the method ` which we can
fix in this PR.
##########
File path: flink-python/pyflink/table/table_result.py
##########
@@ -48,6 +51,22 @@ def get_job_client(self):
else:
return None
+ def wait(self, timeout_ms=None):
Review comment:
call it `await` to be consistent. how are the exceptions handled in
Python?
##########
File path:
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
##########
@@ -135,7 +126,7 @@ public void testTableSourceFullScan() throws Exception {
" h.family3.col3 " +
"FROM hTable AS h");
- List<Row> results = collectBatchResult(table);
+ List<Row> results =
Lists.newArrayList(table.execute().collect());
Review comment:
avoid dependencies to other modules for simple util functions. Introduce
an overloaded method `org.apache.flink.util.CollectionUtil#iteratorToList` in
a hotfix commit
##########
File path: flink-python/pyflink/table/table_result.py
##########
@@ -48,6 +51,22 @@ def get_job_client(self):
else:
return None
+ def wait(self, timeout_ms=None):
+ """
+ Wait if necessary for at most the given time (milliseconds) for the
data to be ready.
+
+ For select operation, this method will wait unit the first row can be
accessed in local.
Review comment:
For a select operation, this method will wait until the first row can be
accessed locally.
For an insert operation, this method will wait for the job to finish,
because the result contains only one row.
For other operations, this method will return immediately, because the
result is already available locally.
##########
File path:
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -284,9 +284,13 @@ public void testStreamingAppend() throws Exception {
tEnv.registerCatalog(hiveCatalog.getName(),
hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
- TableEnvUtil.execInsertSqlAndWaitResult(
- tEnv,
- "insert into db1.sink_table select
6,'a','b','2020-05-03','12'");
+ try {
+ tEnv.executeSql(
+ "insert into db1.sink_table
select 6,'a','b','2020-05-03','12'")
+ .await();
+ } catch (Exception e) {
+ throw new TableException("Failed to execute
sql", e);
Review comment:
throw `AssertionError` in tests with `Assert.fail()`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1413,4 +1415,46 @@ protected TableImpl createTable(QueryOperation
tableOperation) {
operationTreeBuilder,
functionCatalog.asLookup(parser::parseIdentifier));
}
+
+ /**
+ * Iterator for insert operation result.
+ */
+ private static final class InsertResultIterator implements
CloseableIterator<Row> {
Review comment:
`TableEnvironmentImpl` is already quite large. Move it to a separate
class in default scope.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
##########
@@ -73,6 +82,50 @@ private TableResultImpl(
return Optional.ofNullable(jobClient);
}
+ @Override
+ public void await() throws InterruptedException, ExecutionException {
+ try {
+ awaitInternal(-1, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // do nothing
+ }
+ }
+
+ @Override
+ public void await(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
TimeoutException {
+ awaitInternal(timeout, unit);
+ }
+
+ private void awaitInternal(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
TimeoutException {
+ if (jobClient == null) {
+ return;
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(
+ 1, new
ThreadFactoryBuilder().setNameFormat("TableResult-await-thread").build());
Review comment:
avoid guava
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -38,6 +41,33 @@
*/
Optional<JobClient> getJobClient();
+ /**
+ * Wait if necessary until the data is ready.
+ *
+ * <p>For select operation, this method will wait unit the first row
can be accessed in local.
+ * For insert operation, this method will wait for the job to finish,
because the result contains only one row.
+ * For other operations, this method will return immediately, because
the result is ready in local.
+ *
+ * @throws ExecutionException if this future completed exceptionally
Review comment:
`if this future completed exceptionally` it is not really a future
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]