fsk119 commented on code in PR #21502:
URL: https://github.com/apache/flink/pull/21502#discussion_r1066514320


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -506,24 +521,16 @@ public ResultFetcher callStopJobOperation(
                     "Could not stop job " + jobId + " for operation " + handle 
+ ".", e);
         }
         if (isWithSavepoint) {
-            return new ResultFetcher(
+            return ResultFetcher.fromResults(
                     handle,
                     ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, 
DataTypes.STRING())),
                     Collections.singletonList(
                             
GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
         } else {
-            return buildOkResultFetcher(handle);
+            return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, 
false, null);

Review Comment:
   replace with `return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, 
false);`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -50,41 +59,105 @@ public class ResultFetcher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
     private static final int TABLE_RESULT_MAX_INITIAL_CAPACITY = 5000;
+    private static final RowDataToStringConverter DEFAULT_CONVERTER =
+            SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
 
     private final OperationHandle operationHandle;
 
     private final ResolvedSchema resultSchema;
     private final ResultStore resultStore;
     private final LinkedList<RowData> bufferedResults = new LinkedList<>();
     private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+    private final RowDataToStringConverter converter;
+
+    private final boolean isQueryResult;
+
+    @Nullable private final JobID jobID;
+
+    private final ResultKind resultKind;
 
     private long currentToken = 0;
     private boolean noMoreResults = false;
 
-    public ResultFetcher(
+    private ResultFetcher(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
-            CloseableIterator<RowData> resultRows) {
-        this(operationHandle, resultSchema, resultRows, 
TABLE_RESULT_MAX_INITIAL_CAPACITY);
+            CloseableIterator<RowData> resultRows,
+            RowDataToStringConverter converter,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind) {
+        this(
+                operationHandle,
+                resultSchema,
+                resultRows,
+                converter,
+                isQueryResult,
+                jobID,
+                resultKind,
+                TABLE_RESULT_MAX_INITIAL_CAPACITY);
     }
 
     @VisibleForTesting
     ResultFetcher(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
             CloseableIterator<RowData> resultRows,
+            RowDataToStringConverter converter,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind,
             int maxBufferSize) {
         this.operationHandle = operationHandle;
         this.resultSchema = resultSchema;
         this.resultStore = new ResultStore(resultRows, maxBufferSize);
+        this.converter = converter;
+        this.isQueryResult = isQueryResult;
+        this.jobID = jobID;
+        this.resultKind = resultKind;
     }
 
-    public ResultFetcher(
-            OperationHandle operationHandle, ResolvedSchema resultSchema, 
List<RowData> rows) {
+    private ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            List<RowData> rows,
+            @Nullable JobID jobID) {
         this.operationHandle = operationHandle;
         this.resultSchema = resultSchema;
         this.bufferedResults.addAll(rows);
         this.resultStore = ResultStore.DUMMY_RESULT_STORE;
+        this.converter = DEFAULT_CONVERTER;
+        this.isQueryResult = false;
+        this.jobID = jobID;
+        this.resultKind = ResultKind.SUCCESS_WITH_CONTENT;
+    }
+
+    public static ResultFetcher fromTableResult(
+            OperationHandle operationHandle,
+            TableResultInternal tableResult,
+            boolean isQueryResult,
+            @Nullable JobID jobID) {

Review Comment:
   I think jobId is in the `TableResult` and we can remove this.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +283,26 @@ public synchronized ResultSet fetchResults(long token, int 
maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEOSResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {

Review Comment:
   nit: BTW, I think these two methods should belong to the `ResultSetImpl`. 
WDYT?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -45,12 +46,12 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static 
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
-
 /** Manager for the {@link Operation}. */
 @Internal
 public class OperationManager {
 
+    public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();

Review Comment:
   Move this to the NotReadyResult.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+import java.util.Collections;
+import java.util.List;
+
+/** To represent that the execution result is not ready to fetch. */
+public class NotReadyResult implements ResultSet {
+
+    @Override
+    public ResultType getResultType() {
+        return ResultType.NOT_READY;
+    }
+
+    @Override
+    public Long getNextToken() {
+        return 0L;
+    }
+
+    @Override
+    public ResolvedSchema getResultSchema() {
+        return ResolvedSchema.of(Collections.emptyList());
+    }
+
+    @Override
+    public List<RowData> getData() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isQueryResult() {
+        throw new UnsupportedOperationException(
+                "Can't know whether a NOT_READY_RESULT is for a query.");

Review Comment:
   Can't -> Don't
   
   Also add "Please continue fetching results until the result type is PAYLOAD 
or EOS."



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+import java.util.Collections;
+import java.util.List;
+
+/** To represent that the execution result is not ready to fetch. */
+public class NotReadyResult implements ResultSet {
+
+    @Override
+    public ResultType getResultType() {
+        return ResultType.NOT_READY;
+    }
+
+    @Override
+    public Long getNextToken() {
+        return 0L;
+    }
+
+    @Override
+    public ResolvedSchema getResultSchema() {
+        return ResolvedSchema.of(Collections.emptyList());
+    }
+
+    @Override
+    public List<RowData> getData() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isQueryResult() {
+        throw new UnsupportedOperationException(
+                "Can't know whether a NOT_READY_RESULT is for a query.");
+    }
+
+    @Override
+    public JobID getJobID() {
+        throw new UnsupportedOperationException("Can't get job ID from a 
NOT_READY_RESULT.");
+    }
+
+    @Override
+    public ResultKind getResultKind() {
+        throw new UnsupportedOperationException("Can't get result kind from a 
NOT_READY_RESULT.");

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to