jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028512299


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+    private final RestClient restClient;
+    private final ResultStore resultStore;
+    private final KeepingAliveThread keepingAliveThread;
+
+    private String sessionHandleId;
+    private SessionHandle sessionHandle;
+    private SessionMessageParameters sessionMessageParametersInstance;
+
+    private final String address;
+    private final int port;
+
+    /** Creates a remote executor for submitting table programs and retrieving 
results. */
+    public RemoteExecutor(Configuration flinkConfig, String address, int port)
+            throws SqlClientException {
+        this.address = address;
+        this.port = port;
+        // init rest client
+        try {
+            restClient = new RestClient(flinkConfig, 
Executors.directExecutor());
+        } catch (ConfigurationException e) {
+            LOG.error("Cannot get rest client.", e);
+            throw new SqlClientException("Cannot get rest client.", e);
+        }
+        this.resultStore = new ResultStore();
+        this.keepingAliveThread = new KeepingAliveThread(10_000L);
+    }
+
+    public void start() throws SqlClientException {
+        // Open session to address:port and get the session handle ID
+        OpenSessionRequestBody request = new OpenSessionRequestBody(null, 
null);
+        try {
+            CompletableFuture<OpenSessionResponseBody> response =
+                    sendRequest(
+                            OpenSessionHeaders.getInstance(),
+                            EmptyMessageParameters.getInstance(),
+                            request);
+            sessionHandleId = response.get().getSessionHandle();
+            LOG.info("Open session '{}' to {}:{}.", sessionHandleId, address, 
port);
+        } catch (Exception e) {
+            LOG.error(String.format("Failed to open session to %s:%s", 
address, port), e);
+            throw new SqlClientException(
+                    String.format("Failed to open session to %s:%s", address, 
port), e);
+        }
+        sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId));
+        sessionMessageParametersInstance = new 
SessionMessageParameters(sessionHandle);
+        keepingAliveThread.start();
+    }
+
+    public void close() throws SqlClientException {
+        resultStore
+                .getResults()
+                .forEach(
+                        (resultId) -> {
+                            try {
+                                cancelQuery(resultId);
+                            } catch (Throwable t) {
+                                LOG.warn(
+                                        String.format(
+                                                "Unexpected error occurs when 
canceling query. Result ID: %s.",
+                                                resultId),
+                                        t);
+                                // ignore any throwable to keep the cleanup 
running
+                            }
+                        });
+
+        // close session
+        try {
+            CompletableFuture<CloseSessionResponseBody> response =
+                    sendRequest(
+                            CloseSessionHeaders.getInstance(),
+                            sessionMessageParametersInstance,
+                            EmptyRequestBody.getInstance());
+
+            if (!response.get().getStatus().equals(CLOSE_MESSAGE)) {
+                LOG.warn("The status of close session response isn't {}.", 
CLOSE_MESSAGE);
+            }
+        } catch (Throwable t) {
+            LOG.warn(
+                    String.format(
+                            "Unexpected error occurs when closing session 
%s.", sessionHandleId),
+                    t);
+            // ignore any throwable to keep the cleanup running
+        }
+
+        keepingAliveThread.interrupt();
+    }
+
+    public Map<String, String> getSessionConfig() throws SqlClientException {
+        try {
+            CompletableFuture<GetSessionConfigResponseBody> response =
+                    sendRequest(
+                            GetSessionConfigHeaders.getInstance(),
+                            sessionMessageParametersInstance,
+                            EmptyRequestBody.getInstance());
+            return response.get().getProperties();
+        } catch (Exception e) {
+            LOG.error("Failed to get session config.", e);
+            throw new SqlClientException("Failed to get session config.", e);
+        }
+    }
+
+    public List<String> completeStatement(String statement, int position) {
+        return new ArrayList<>();
+    }
+
+    public TableResultWrapper executeStatement(
+            String statement, long executionTimeOutMs, @Nullable Configuration 
executionConfig)
+            throws SqlClientException {
+        if (executionTimeOutMs <= 0) {
+            LOG.error("The timeout must be positive.");
+            throw new SqlClientException("The timeout must be positive.");
+        }
+
+        statement = statement.trim();
+        LOG.info("Executing SQL statement: '{}'...", statement);
+        Map<String, String> config =
+                executionConfig == null ? new HashMap<>() : 
executionConfig.toMap();
+        ExecuteStatementRequestBody request =
+                new ExecuteStatementRequestBody(statement, 0L, config);
+        try {
+            CompletableFuture<ExecuteStatementResponseBody> 
executeStatementResponse =
+                    sendRequest(
+                            ExecuteStatementHeaders.getInstance(),
+                            sessionMessageParametersInstance,
+                            request);
+
+            String operationHandleId = 
executeStatementResponse.get().getOperationHandle();
+            OperationHandle operationHandle =
+                    new OperationHandle(UUID.fromString(operationHandleId));
+
+            LOG.info("Fetching the first result...");
+            FetchResultsResponseBody fetchResultsResponse =
+                    fetchWhenResultsReady(operationHandle, 
Duration.ofMillis(executionTimeOutMs));
+            ResultSet firstResult = fetchResultsResponse.getResults();
+            Long nextToken = 
parseTokenFromUri(fetchResultsResponse.getNextResultUri());
+
+            TableResultWrapper result =
+                    new TableResultWrapper(this, operationHandle, firstResult, 
nextToken);
+
+            if (isQuery(statement)) {
+                storeResult(result, executionConfig);
+            }
+            return result;
+        } catch (Exception e) {
+            LOG.error("Unexpected error occurs when executing SQL statement.", 
e);
+            throw new SqlClientException(
+                    "Unexpected error occurs when executing SQL statement.", 
e);
+        }
+    }
+
+    public TypedResult<List<RowData>> retrieveResultChanges(String resultId) {
+        DynamicResult result = resultStore.getResult(resultId);
+        if (result == null) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Could not find a result with result identifier 
'%s'.", resultId));
+        }
+        if (result.isMaterialized()) {
+            throw new SqlClientException("Invalid result retrieval mode.");
+        }
+        return ((ChangelogResult) result).retrieveChanges();
+    }
+
+    public TypedResult<Integer> snapshotResult(String resultId, int pageSize) {
+        DynamicResult result = resultStore.getResult(resultId);
+        if (result == null) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Could not find a result with result identifier 
'%s'.", resultId));
+        }
+        if (!result.isMaterialized()) {
+            throw new SqlExecutionException("Invalid result retrieval mode.");
+        }
+        return ((MaterializedResult) result).snapshot(pageSize);
+    }
+
+    public List<RowData> retrieveResultPage(String resultId, int page) {
+        final DynamicResult result = resultStore.getResult(resultId);
+        if (result == null) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Could not find a result with result identifier 
'%s'.", resultId));
+        }
+        if (!result.isMaterialized()) {
+            throw new SqlExecutionException("Invalid result retrieval mode.");
+        }
+        return ((MaterializedResult) result).retrievePage(page);
+    }
+
+    public void cancelQuery(String resultId) throws SqlExecutionException {
+        DynamicResult result = resultStore.getResult(resultId);
+        if (result == null) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Could not find a result with result identifier 
'%s'.", resultId));
+        }
+
+        // stop retrieval and remove the result
+        LOG.info("Cancelling job {} and result retrieval.", resultId);
+        try {
+            // this operator will also stop flink job
+            result.close();
+        } catch (Throwable t) {
+            throw new SqlExecutionException("Could not cancel the query 
execution", t);
+        }
+        resultStore.removeResult(resultId);
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    private <
+                    M extends MessageHeaders<R, P, U>,
+                    U extends MessageParameters,
+                    R extends RequestBody,
+                    P extends ResponseBody>
+            CompletableFuture<P> sendRequest(M messageHeaders, U 
messageParameters, R request)
+                    throws IOException {
+        return restClient.sendRequest(address, port, messageHeaders, 
messageParameters, request);
+    }
+
+    private FetchResultsResponseBody fetchResults(OperationHandle 
operationHandle) {
+        return fetchResults(operationHandle, 0L);
+    }
+
+    public FetchResultsResponseBody fetchResults(OperationHandle 
operationHandle, long token)
+            throws SqlClientException {
+        FetchResultsTokenParameters fetchResultsTokenParameters =
+                new FetchResultsTokenParameters(sessionHandle, 
operationHandle, token);
+        try {
+            return sendRequest(
+                            FetchResultsHeaders.getInstance(),
+                            fetchResultsTokenParameters,
+                            EmptyRequestBody.getInstance())
+                    .get();
+        } catch (Exception e) {
+            LOG.error(
+                    String.format(
+                            "Unexpected error occurs when fetching results. 
OperationHandle ID: %s.",
+                            operationHandle),
+                    e);
+            throw new SqlClientException(

Review Comment:
   From a little more experimenting...  this line rethrows the exception and 
kills the SQL client process.
   
   As a simple example, suppose one tries to `describe` a non-existent table.
   
   With the local client, one sees:
   ```
   Flink SQL> describe someTable;
   [ERROR] Could not execute SQL statement. Reason:
   org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.someTable' doesn't exist```
   
   In gateway mode, the result is a long exception: 
   
   ```
   Flink SQL> describe someTable;                                               
                                                                                
                      
                                                                                
                                                                                
                      
   
           Exception in thread "main" 
org.apache.flink.table.client.SqlClientException: Unexpected error occurs when 
fetching execution results .                                             
           at 
org.apache.flink.table.client.gateway.remote.RemoteExecutor.executeStatement(RemoteExecutor.java:219)
                                                                   
           at 
org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:667)
                                                                                
        
           at 
org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:472)   
                                                                                
        
           at 
org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:367)
                                                                                
        
           at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:323)
                                                                                
 
           at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:279)
                                                                                
      
           at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:227)
                                                                                
           at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:148)             
                                                                                
        
           at org.apache.flink.table.client.SqlClient.start(SqlClient.java:96)  
                                                                                
                      
           at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:170)         
                                                                                
        
           at org.apache.flink.table.client.SqlClient.main(SqlClient.java:158)  
                                                                                
                      
   Caused by: org.apache.flink.table.client.SqlClientException: Unexpected 
error occurs when fetching results. OperationHandle ID: 
156c46b9-4278-4494-adb8-4339b8a71b20.
           at 
org.apache.flink.table.client.gateway.remote.RemoteExecutor.fetchResults(RemoteExecutor.java:389)
                                      
           at 
org.apache.flink.table.client.gateway.remote.RemoteExecutor.fetchResults(RemoteExecutor.java:368)
                                      
           at 
org.apache.flink.table.client.gateway.remote.RemoteExecutor.fetchWhenResultsReady(RemoteExecutor.java:406)
                           
           at 
org.apache.flink.table.client.gateway.remote.RemoteExecutor.executeStatement(RemoteExecutor.java:212)
               
           ... 10 more                                                          
                                                                                
                      
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., <Exception on server side:
   org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
fetchResults.
           at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:86)
                   
           at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
      
           at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
    
           at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
                            
           at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
                     
           at java.base/java.util.Optional.ifPresent(Optional.java:183)         
                                                                                
                      
           at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)      
                                                                                
        
           at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
           at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
           at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
                                                                          
           at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
                           
           at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
                       
           at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
           at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
     
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
           at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210)
                                      
           at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
                   
           at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
           at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
           at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    
           at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
           at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
           at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
                   
           at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at java.base/java.lang.Thread.run(Thread.java:829)                   
                                                                                
                      
   Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
Failed to fetchResults.                                 
           at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:203)
            
           at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
           ... 48 more                                                          
                                                                                
                      
   Caused by: 
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
execute the operation 156c46b9-4278-4494-adb8-4339b8a71b20.
           at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:391)
           at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:249)
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   
           at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)              
                                                                                
        
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          
           at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)              
                                                                                
        
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           ... 1 more
   Caused by: org.apache.flink.table.api.ValidationException: Tables or views 
with the identifier 'default_catalog.default_database.someTable' doesn't exist
           at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1408)
           at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:121)
           at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$0(SqlGatewayServiceImpl.java:182)
           at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:111)
           at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:240)
           ... 7 more
   
   End of exception on server side>]
   ...
   Shutting down the session...
   done.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to