xuyangzhong commented on code in PR #21665:
URL: https://github.com/apache/flink/pull/21665#discussion_r1071820866


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java:
##########
@@ -20,46 +20,66 @@
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.client.config.ResultMode;
+import 
org.apache.flink.table.client.gateway.local.result.ChangelogCollectResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import 
org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult;
+import 
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.table.client.config.ResultMode.CHANGELOG;
+import static org.apache.flink.table.client.config.ResultMode.TABLE;
 import static 
org.apache.flink.table.client.config.SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
 import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 
 /** Describes a result to be expected from a table program. */
 public class ResultDescriptor {
 
-    private final String resultId;
-    private final ResolvedSchema resultSchema;
-    private final boolean isMaterialized;
+    private final TableResultInternal tableResult;
     private final ReadableConfig config;
-    private final RowDataToStringConverter rowDataToStringConverter;
 
-    public ResultDescriptor(
-            String resultId,
-            ResolvedSchema resultSchema,
-            boolean isMaterialized,
-            ReadableConfig config,
-            RowDataToStringConverter rowDataToStringConverter) {
-        this.resultId = resultId;
-        this.resultSchema = resultSchema;
-        this.isMaterialized = isMaterialized;
+    public ResultDescriptor(TableResultInternal tableResult, ReadableConfig 
config) {
+        this.tableResult = tableResult;
         this.config = config;
-        this.rowDataToStringConverter = rowDataToStringConverter;
     }
 
-    public String getResultId() {
-        return resultId;
+    @SuppressWarnings("unchecked")
+    public <T extends DynamicResult> T createResult() {
+        if (config.get(EXECUTION_RESULT_MODE).equals(CHANGELOG)

Review Comment:
   nit: use a local variable to avoid getting the value multi times.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -75,15 +69,13 @@ public class LocalExecutor implements Executor {
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalExecutor.class);
 
     // result maintenance
-    private final ResultStore resultStore;

Review Comment:
   It seems some code in ResultStore can be cleaned.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java:
##########
@@ -174,6 +180,9 @@ private void printStreamingResults(AtomicInteger 
receivedRowCount) {
                 case PAYLOAD:
                     List<RowData> changes = result.getPayload();
                     for (RowData change : changes) {
+                        if (Thread.currentThread().isInterrupted()) {

Review Comment:
   What about moving this code block to the outer loop?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectResultBase.java:
##########
@@ -29,23 +29,28 @@
 
 /** A result that works through {@link TableResult#collect()}. */
 public abstract class CollectResultBase implements DynamicResult {
-    private final CloseableIterator<RowData> result;
+    private final CloseableIterator<RowData> resultIterator;
 
     protected final Object resultLock;
     protected AtomicReference<SqlExecutionException> executionException = new 
AtomicReference<>();
     protected final ResultRetrievalThread retrievalThread;
 
     public CollectResultBase(TableResultInternal tableResult) {
-        result = tableResult.collectInternal();
+        resultIterator = tableResult.collectInternal();
         resultLock = new Object();
         retrievalThread = new ResultRetrievalThread();
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {

Review Comment:
   Why changes to catch the exception internally?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java:
##########
@@ -361,25 +364,52 @@ protected int getHeight() {
         return height;
     }
 
+    public void clearTerminal() {
+        if (isPlainTerminal()) {
+            for (int i = 0; i < 200; i++) { // large number of empty lines
+                terminal.writer().println();
+            }
+        } else {
+            terminal.puts(InfoCmp.Capability.clear_screen);
+        }
+    }
+
+    protected boolean isPlainTerminal() {

Review Comment:
   IMO, it's not a good idea to retain the same code blocks here and in 
"CliClient#isPlainTerminal".



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -75,15 +69,13 @@ public class LocalExecutor implements Executor {
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalExecutor.class);
 
     // result maintenance
-    private final ResultStore resultStore;

Review Comment:
   Should we also need to store the results of multi jobs in memory like before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to