yuzelin commented on code in PR #21677:
URL: https://github.com/apache/flink/pull/21677#discussion_r1071864076


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+
+import javax.annotation.Nullable;
+
+/** The implementation of the {@link FetchResultsResponseBody} with ready 
results. */
+public class FetchResultResponseBodyImpl implements FetchResultsResponseBody {
+
+    private final ResultInfo results;
+    private final ResultSet.ResultType resultType;
+    private final String nextResultUri;
+    private final boolean isQueryResult;
+    private final JobID jobID;
+    private final ResultKind resultKind;
+
+    public FetchResultResponseBodyImpl(
+            ResultSet.ResultType resultType,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind,
+            ResultInfo results,
+            @Nullable String nextResultUri) {
+        this.results = results;
+        this.resultType = resultType;
+        this.nextResultUri = nextResultUri;
+        this.isQueryResult = isQueryResult;
+        this.jobID = jobID;
+        this.resultKind = resultKind;
+    }

Review Comment:
   The fields `jobID` and `nextResultUri` should annotated `@Nullable`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+
+import javax.annotation.Nullable;
+
+/** The {@link FetchResultsResponseBody} indicates the results is not ready. */
+public class NotReadyFetchResultResponse implements FetchResultsResponseBody {
+
+    private final String nextResultUri;
+
+    public NotReadyFetchResultResponse(String nextResultUri) {
+        this.nextResultUri = nextResultUri;
+    }
+
+    @Override
+    public ResultInfo getResults() {
+        throw new SqlGatewayException(
+                "The result is not ready. Please fetch results until the 
result type is PAYLOAD or EOS");

Review Comment:
   Missing `.` at the end of message.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java:
##########
@@ -30,64 +32,82 @@
 import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultResponseBodyImpl;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowFormatQueryParameter;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /** Handler to fetch results. */
 public class FetchResultsHandler
         extends AbstractSqlGatewayRestHandler<
-                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsTokenParameters> {
+                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsMessageParameters> {
 
     public FetchResultsHandler(
             SqlGatewayService service,
             Map<String, String> responseHeaders,
-            MessageHeaders<EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsTokenParameters>
+            MessageHeaders<
+                            EmptyRequestBody,
+                            FetchResultsResponseBody,
+                            FetchResultsMessageParameters>
                     messageHeaders) {
         super(service, responseHeaders, messageHeaders);
     }
 
     @Override
     protected CompletableFuture<FetchResultsResponseBody> handleRequest(
-            SqlGatewayRestAPIVersion version, @Nonnull 
HandlerRequest<EmptyRequestBody> request) {
+            SqlGatewayRestAPIVersion version, @Nonnull 
HandlerRequest<EmptyRequestBody> request)

Review Comment:
   How about delete the `@Nonnull` (and also delete it from it's parent)?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+
+/**
+ * A {@link MessageQueryParameter} that parses the 'rowFormat' query parameter 
for fetching results.
+ */
+public class FetchResultsRowFormatQueryParameter extends 
MessageQueryParameter<RowFormat> {
+
+    public static final String KEY = "rowFormat";
+

Review Comment:
   Seems other REST APIs use `_`, like `SessionHandleIdPathParameter.KEY`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to