xintongsong commented on code in PR #20236:
URL: https://github.com/apache/flink/pull/20236#discussion_r920845321


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {
+
+    private static final EmptyLeaderAdapter EMPTY_LEADER_ADAPTER = new 
EmptyLeaderAdapter();
+    protected SqlGatewayService service;
+
+    private final MessageHeaders<R, P, M> messageHeaders;
+
+    protected AbstractSqlGatewayRestHandler(
+            SqlGatewayService service,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<R, P, M> messageHeaders) {
+        super(
+                () -> CompletableFuture.completedFuture(EMPTY_LEADER_ADAPTER),
+                timeout,
+                responseHeaders,
+                messageHeaders);
+        this.service = service;
+        this.messageHeaders = messageHeaders;
+    }
+
+    @Override
+    protected CompletableFuture<Void> respondToRequest(
+            ChannelHandlerContext ctx,
+            HttpRequest httpRequest,
+            HandlerRequest<R> handlerRequest,
+            RestfulGateway gateway) {
+        CompletableFuture<P> response;
+
+        String uri = httpRequest.uri();
+        int slashIndex = uri.indexOf('/', 1);
+        if (slashIndex < 0) {
+            slashIndex = uri.length();
+        }
+
+        RestEndpointVersion version = 
RestEndpointVersion.valueOf(uri.substring(1, slashIndex));
+
+        try {
+            response = handleRequest(version, handlerRequest);
+        } catch (RestHandlerException e) {
+            response = FutureUtils.completedExceptionally(e);
+        }
+
+        return response.thenAccept(
+                resp ->
+                        HandlerUtils.sendResponse(
+                                ctx,
+                                httpRequest,
+                                resp,
+                                messageHeaders.getResponseStatusCode(),
+                                responseHeaders));
+    }
+
+    /**
+     * This method is called for every incoming request and returns a {@link 
CompletableFuture}
+     * containing a the response.
+     *
+     * <p>Implementations may decide whether to throw {@link 
RestHandlerException}s or fail the
+     * returned {@link CompletableFuture} with a {@link RestHandlerException}.
+     *
+     * <p>Failing the future with another exception type or throwing unchecked 
exceptions is
+     * regarded as an implementation error as it does not allow us to provide 
a meaningful HTTP
+     * status code. In this case a {@link 
HttpResponseStatus#INTERNAL_SERVER_ERROR} will be
+     * returned.
+     *
+     * @param version request version
+     * @param request request that should be handled
+     * @return future containing a handler response
+     * @throws RestHandlerException if the handling failed
+     */
+    protected abstract CompletableFuture<P> handleRequest(
+            @Nullable RestEndpointVersion version, @Nonnull HandlerRequest<R> 
request)
+            throws RestHandlerException;
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class EmptyLeaderAdapter implements RestfulGateway {

Review Comment:
   I'd suggest to name this class `NonLeaderRetrievalRestfulGateway`, and move 
it to `flink-runtime`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {
+
+    private static final EmptyLeaderAdapter EMPTY_LEADER_ADAPTER = new 
EmptyLeaderAdapter();

Review Comment:
   Singleton of this class should be guaranteed by itself.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {

Review Comment:
   ```suggestion
           extends AbstractHandler<EmptyLeaderAdapter, R, M> {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {
+
+    private static final EmptyLeaderAdapter EMPTY_LEADER_ADAPTER = new 
EmptyLeaderAdapter();
+    protected SqlGatewayService service;
+
+    private final MessageHeaders<R, P, M> messageHeaders;
+
+    protected AbstractSqlGatewayRestHandler(
+            SqlGatewayService service,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<R, P, M> messageHeaders) {
+        super(
+                () -> CompletableFuture.completedFuture(EMPTY_LEADER_ADAPTER),
+                timeout,
+                responseHeaders,
+                messageHeaders);
+        this.service = service;
+        this.messageHeaders = messageHeaders;
+    }
+
+    @Override
+    protected CompletableFuture<Void> respondToRequest(
+            ChannelHandlerContext ctx,
+            HttpRequest httpRequest,
+            HandlerRequest<R> handlerRequest,
+            RestfulGateway gateway) {
+        CompletableFuture<P> response;
+
+        String uri = httpRequest.uri();
+        int slashIndex = uri.indexOf('/', 1);
+        if (slashIndex < 0) {
+            slashIndex = uri.length();
+        }
+
+        RestEndpointVersion version = 
RestEndpointVersion.valueOf(uri.substring(1, slashIndex));

Review Comment:
   API version is a common property. I think we should include it in 
`HandlerRequest`. It can be parsed in e.g. `Router#route`, or somewhere else on 
the common path.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointConfigOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Options to configure {@link 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint}. */
+@PublicEvolving
+public class SqlGatewayRestEndpointConfigOptions {
+
+    public static final String GATEWAY_ENDPOINT_PREFIX = 
"sql-gateway.endpoint.";
+
+    public static final ConfigOption<String> BIND_ADDRESS =
+            key("rest.bind-address")
+                    .stringType()
+                    .defaultValue("0.0.0.0")
+                    .withDescription("The address that the rest endpoint binds 
itself.");
+
+    public static final ConfigOption<String> BIND_PORT =
+            key("rest.bind-port")
+                    .stringType()
+                    .defaultValue("8083")

Review Comment:
   Do we support configuring a list/range of ports? Also, what about the 
external port?
   
   See `RestOptions`.



##########
flink-table/flink-sql-gateway/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory:
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory

Review Comment:
   It seems this class does not exist.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointConfigOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Options to configure {@link 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint}. */
+@PublicEvolving
+public class SqlGatewayRestEndpointConfigOptions {
+
+    public static final String GATEWAY_ENDPOINT_PREFIX = 
"sql-gateway.endpoint.";

Review Comment:
   Why not just include the prefix in the config options?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointConfigOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Options to configure {@link 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint}. */
+@PublicEvolving
+public class SqlGatewayRestEndpointConfigOptions {

Review Comment:
   ```suggestion
   public class SqlGatewayRestOptions {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RestEndpointVersion.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+
+/** Rest endpoint version. */
+public enum RestEndpointVersion implements EndpointVersion {

Review Comment:
   We probably need to reuse `o.a.f.runtime.rest.versioning.RestAPIVersion`. We 
can have different enum values for runtime and sql-gateway, while share the 
remaining common logics.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {
+
+    private static final EmptyLeaderAdapter EMPTY_LEADER_ADAPTER = new 
EmptyLeaderAdapter();
+    protected SqlGatewayService service;
+
+    private final MessageHeaders<R, P, M> messageHeaders;
+
+    protected AbstractSqlGatewayRestHandler(
+            SqlGatewayService service,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<R, P, M> messageHeaders) {
+        super(
+                () -> CompletableFuture.completedFuture(EMPTY_LEADER_ADAPTER),
+                timeout,
+                responseHeaders,
+                messageHeaders);
+        this.service = service;
+        this.messageHeaders = messageHeaders;
+    }
+
+    @Override
+    protected CompletableFuture<Void> respondToRequest(
+            ChannelHandlerContext ctx,
+            HttpRequest httpRequest,
+            HandlerRequest<R> handlerRequest,
+            RestfulGateway gateway) {
+        CompletableFuture<P> response;
+
+        String uri = httpRequest.uri();
+        int slashIndex = uri.indexOf('/', 1);
+        if (slashIndex < 0) {
+            slashIndex = uri.length();
+        }
+
+        RestEndpointVersion version = 
RestEndpointVersion.valueOf(uri.substring(1, slashIndex));
+
+        try {
+            response = handleRequest(version, handlerRequest);
+        } catch (RestHandlerException e) {
+            response = FutureUtils.completedExceptionally(e);
+        }
+
+        return response.thenAccept(
+                resp ->
+                        HandlerUtils.sendResponse(
+                                ctx,
+                                httpRequest,
+                                resp,
+                                messageHeaders.getResponseStatusCode(),
+                                responseHeaders));
+    }
+
+    /**
+     * This method is called for every incoming request and returns a {@link 
CompletableFuture}
+     * containing a the response.
+     *
+     * <p>Implementations may decide whether to throw {@link 
RestHandlerException}s or fail the
+     * returned {@link CompletableFuture} with a {@link RestHandlerException}.
+     *
+     * <p>Failing the future with another exception type or throwing unchecked 
exceptions is
+     * regarded as an implementation error as it does not allow us to provide 
a meaningful HTTP
+     * status code. In this case a {@link 
HttpResponseStatus#INTERNAL_SERVER_ERROR} will be
+     * returned.
+     *
+     * @param version request version
+     * @param request request that should be handled
+     * @return future containing a handler response
+     * @throws RestHandlerException if the handling failed
+     */
+    protected abstract CompletableFuture<P> handleRequest(
+            @Nullable RestEndpointVersion version, @Nonnull HandlerRequest<R> 
request)
+            throws RestHandlerException;
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class EmptyLeaderAdapter implements RestfulGateway {
+
+        private static final String MESSAGE = "EmptyLeaderAdapter doesn't 
support the operation.";
+
+        private EmptyLeaderAdapter() {}
+
+        @Override
+        public String getAddress() {
+            throw new UnsupportedOperationException(MESSAGE);
+        }
+
+        @Override
+        public String getHostname() {
+            return "localhost";

Review Comment:
   Why isn't this unsupported?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/AbstractSqlGatewayRestHandler.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.util.RestEndpointVersion;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Super class for sql gateway handlers that work with {@link RequestBody}s 
and {@link
+ * ResponseBody}s.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+public abstract class AbstractSqlGatewayRestHandler<
+                R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
+        extends AbstractHandler<RestfulGateway, R, M> {
+
+    private static final EmptyLeaderAdapter EMPTY_LEADER_ADAPTER = new 
EmptyLeaderAdapter();
+    protected SqlGatewayService service;
+
+    private final MessageHeaders<R, P, M> messageHeaders;
+
+    protected AbstractSqlGatewayRestHandler(
+            SqlGatewayService service,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<R, P, M> messageHeaders) {
+        super(
+                () -> CompletableFuture.completedFuture(EMPTY_LEADER_ADAPTER),
+                timeout,
+                responseHeaders,
+                messageHeaders);
+        this.service = service;
+        this.messageHeaders = messageHeaders;
+    }
+
+    @Override
+    protected CompletableFuture<Void> respondToRequest(
+            ChannelHandlerContext ctx,
+            HttpRequest httpRequest,
+            HandlerRequest<R> handlerRequest,
+            RestfulGateway gateway) {
+        CompletableFuture<P> response;
+
+        String uri = httpRequest.uri();
+        int slashIndex = uri.indexOf('/', 1);
+        if (slashIndex < 0) {
+            slashIndex = uri.length();
+        }
+
+        RestEndpointVersion version = 
RestEndpointVersion.valueOf(uri.substring(1, slashIndex));

Review Comment:
   Moreover, it seems not handling cases that api version is not specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to