twalthr commented on a change in pull request #12688:
URL: https://github.com/apache/flink/pull/12688#discussion_r441519689



##########
File path: 
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
##########
@@ -135,7 +126,7 @@ public void testTableSourceFullScan() throws Exception {
                        "  h.family3.col3 " +
                        "FROM hTable AS h");
 

Review comment:
       For all locations that were updated in the previous commit the declared 
`throws Exception` is not necessary anymore. My IDE shows a lot of warnings 
`Exception 'java.lang.Exception' is never thrown in the method ` which we can 
fix in this PR.

##########
File path: flink-python/pyflink/table/table_result.py
##########
@@ -48,6 +51,22 @@ def get_job_client(self):
         else:
             return None
 
+    def wait(self, timeout_ms=None):

Review comment:
       call it `await` to be consistent. how are the exceptions handled in 
Python?

##########
File path: 
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
##########
@@ -135,7 +126,7 @@ public void testTableSourceFullScan() throws Exception {
                        "  h.family3.col3 " +
                        "FROM hTable AS h");
 
-               List<Row> results = collectBatchResult(table);
+               List<Row> results = 
Lists.newArrayList(table.execute().collect());

Review comment:
       avoid dependencies to other modules for simple util functions. Introduce 
an overloaded method  `org.apache.flink.util.CollectionUtil#iteratorToList` in 
a hotfix commit

##########
File path: flink-python/pyflink/table/table_result.py
##########
@@ -48,6 +51,22 @@ def get_job_client(self):
         else:
             return None
 
+    def wait(self, timeout_ms=None):
+        """
+        Wait if necessary for at most the given time (milliseconds) for the 
data to be ready.
+
+        For select operation, this method will wait unit the first row can be 
accessed in local.

Review comment:
       For a select operation, this method will wait until the first row can be 
accessed locally.
   For an insert operation, this method will wait for the job to finish, 
because the result contains only one row.
   For other operations, this method will return immediately, because the 
result is already available locally.

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -284,9 +284,13 @@ public void testStreamingAppend() throws Exception {
                        tEnv.registerCatalog(hiveCatalog.getName(), 
hiveCatalog);
                        tEnv.useCatalog(hiveCatalog.getName());
 
-                       TableEnvUtil.execInsertSqlAndWaitResult(
-                                       tEnv,
-                                       "insert into db1.sink_table select 
6,'a','b','2020-05-03','12'");
+                       try {
+                               tEnv.executeSql(
+                                               "insert into db1.sink_table 
select 6,'a','b','2020-05-03','12'")
+                                               .await();
+                       } catch (Exception e) {
+                               throw new TableException("Failed to execute 
sql", e);

Review comment:
       throw `AssertionError` in tests with `Assert.fail()`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1413,4 +1415,46 @@ protected TableImpl createTable(QueryOperation 
tableOperation) {
                        operationTreeBuilder,
                        functionCatalog.asLookup(parser::parseIdentifier));
        }
+
+       /**
+        * Iterator for insert operation result.
+        */
+       private static final class InsertResultIterator implements 
CloseableIterator<Row> {

Review comment:
       `TableEnvironmentImpl` is already quite large. Move it to a separate 
class in default scope.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
##########
@@ -73,6 +82,50 @@ private TableResultImpl(
                return Optional.ofNullable(jobClient);
        }
 
+       @Override
+       public void await() throws InterruptedException, ExecutionException {
+               try {
+                       awaitInternal(-1, TimeUnit.MILLISECONDS);
+               } catch (TimeoutException e) {
+                       // do nothing
+               }
+       }
+
+       @Override
+       public void await(long timeout, TimeUnit unit)
+                       throws InterruptedException, ExecutionException, 
TimeoutException {
+               awaitInternal(timeout, unit);
+       }
+
+       private void awaitInternal(long timeout, TimeUnit unit)
+                       throws InterruptedException, ExecutionException, 
TimeoutException {
+               if (jobClient == null) {
+                       return;
+               }
+
+               ExecutorService executor = Executors.newFixedThreadPool(
+                               1, new 
ThreadFactoryBuilder().setNameFormat("TableResult-await-thread").build());

Review comment:
       avoid guava

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -38,6 +41,33 @@
         */
        Optional<JobClient> getJobClient();
 
+       /**
+        * Wait if necessary until the data is ready.
+        *
+        * <p>For select operation, this method will wait unit the first row 
can be accessed in local.
+        * For insert operation, this method will wait for the job to finish, 
because the result contains only one row.
+        * For other operations, this method will return immediately, because 
the result is ready in local.
+        *
+        * @throws ExecutionException if this future completed exceptionally

Review comment:
       `if this future completed exceptionally` it is not really a future




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to