Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167917093 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List<URL> dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration + flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir); + } catch (Exception e) { + throw new SqlClientException("Could not load Flink configuration.", e); + } + + // try to find a default environment + if (defaultEnv == null) { + final String defaultFilePath = flinkConfigDir + "/" + DEFAULT_ENV_FILE; + System.out.println("No default environment specified."); + System.out.print("Searching for '" + defaultFilePath + "'..."); + final File file = new File(defaultFilePath); + if (file.exists()) { + System.out.println("found."); + try { + defaultEnv = Path.fromLocalFile(file).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } else { + System.out.println("not found."); + } + } + + // inform user + if (defaultEnv != null) { + System.out.println("Reading default environment from: " + defaultEnv); + try { + environment = Environment.parse(defaultEnv); + } catch (IOException e) { + throw new SqlClientException("Could not read default environment file at: " + defaultEnv, e); + } + } else { + environment = new Environment(); + } + + // discover dependencies + dependencies = new ArrayList<>(); + try { + // find jar files + for (URL url : jars) { + JobWithJars.checkJarFile(url); + dependencies.add(url); + } + + // find jar files in library directories + for (URL libUrl : libraries) { + final File dir = new File(libUrl.toURI()); + if (!dir.isDirectory() || !dir.canRead()) { + throw new SqlClientException("Directory cannot be read: " + dir); + } + final File[] files = dir.listFiles(); + if (files == null) { + throw new SqlClientException("Directory cannot be read: " + dir); + } + for (File f : files) { + // only consider jars + if (f.isFile() && f.getAbsolutePath().toLowerCase().endsWith(".jar")) { + final URL url = f.toURI().toURL(); + JobWithJars.checkJarFile(url); + dependencies.add(url); + } + } + } + } catch (Exception e) { + throw new SqlClientException("Could not load all required JAR files.", e); + } + + // prepare result store + resultStore = new ResultStore(flinkConfig); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void start() { + // nothing to do yet + } + + @Override + public Map<String, String> getSessionProperties(SessionContext context) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final Map<String, String> properties = new HashMap<>(); + properties.putAll(env.getExecution().toProperties()); + properties.putAll(env.getDeployment().toProperties()); + return properties; + } + + @Override + public List<String> listTables(SessionContext context) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + return Arrays.asList(tableEnv.listTables()); + } + + @Override + public TableSchema getTableSchema(SessionContext context, String name) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + try { + return tableEnv.scan(name).getSchema(); + } catch (TableException e) { + return null; // no table with this name found + } + } + + @Override + public String explainStatement(SessionContext context, String statement) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + try { + final Table table = tableEnv.sqlQuery(statement); + return tableEnv.explain(table); + } catch (Throwable t) { + // catch everything such that the query does not crash the executor + throw new SqlExecutionException("Invalid SQL statement.", t); + } + } + + @Override + public ResultDescriptor executeQuery(SessionContext context, String query) throws SqlExecutionException { + final Environment env = getEnvironment(context); + + // deployment + final ClusterClient<?> clusterClient = prepareDeployment(env.getDeployment()); + + // initialize result + final DynamicResult result = resultStore.createResult(env); + + // create plan with jars + final Tuple2<JobGraph, TableSchema> plan = createPlan(context.getName(), env, query, result.getTableSink(), clusterClient); + + // store the result with a unique id (the job id for now) + final String resultId = plan.f0.getJobID().toString(); + resultStore.storeResult(resultId, result); + + // create class loader + final ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader( + dependencies, + Collections.emptyList(), + this.getClass().getClassLoader()); + + // create execution + final Runnable program = () -> { + // we need to submit the job attached for now + // otherwise it is not possible to retrieve the reason why an execution failed + try { + clusterClient.run(plan.f0, classLoader); + } catch (ProgramInvocationException e) { + throw new SqlExecutionException("Could not execute table program.", e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + // ignore + } + } + }; + + // start result retrieval + result.startRetrieval(program); + + return new ResultDescriptor(resultId, plan.f1, result.isChangelog()); + } + + @Override + public TypedResult<Tuple2<Boolean, Row>> retrieveResultRecord(SessionContext context, String resultId) throws SqlExecutionException { + final DynamicResult result = resultStore.getResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (!result.isChangelog()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return result.retrieveRecord(); + } + + @Override + public TypedResult<Integer> snapshotResult(SessionContext context, String resultId, int pageSize) throws SqlExecutionException { + final DynamicResult result = resultStore.getResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (result.isChangelog()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return result.snapshot(pageSize); + } + + @Override + public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException { + final DynamicResult result = resultStore.getResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (result.isChangelog()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return result.retrievePage(page); + } + + @Override + public void cancelQuery(SessionContext context, String resultId) throws SqlExecutionException { + final DynamicResult result = resultStore.getResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + + // stop retrieval and remove the result + result.close(); + resultStore.removeResult(resultId); + + // stop Flink job + final Environment env = getEnvironment(context); + final ClusterClient<?> clusterClient = prepareDeployment(env.getDeployment()); + try { + clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))); + } catch (Exception e) { + // the job might has finished earlier + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + // ignore + } + } + } + + @Override + public void stop(SessionContext context) { + resultStore.getResults().forEach((resultId) -> { + try { + cancelQuery(context, resultId); + } catch (Throwable e) { + // ignore any throwable to keep the clean up running + } + }); + } + + // -------------------------------------------------------------------------------------------- + + private List<URL> collectJars() { + return Collections.emptyList(); + } + + private Tuple2<JobGraph, TableSchema> createPlan(String name, Environment env, String query, TableSink<?> sink, ClusterClient<?> clusterClient) { + final TableEnvironment tableEnv = getTableEnvironment(env); + + final QueryConfig queryConfig = getQueryConfig(env.getExecution()); + + // parse and validate query + final Table table; + try { + table = tableEnv.sqlQuery(query); + table.writeToSink(sink, queryConfig); + } catch (Throwable t) { + // catch everything such that the query does not crash the executor + throw new SqlExecutionException("Invalid SQL statement.", t); + } + + // extract plan + final FlinkPlan plan; + if (env.getExecution().isStreamingExecution()) { + final StreamGraph graph = ((StreamTableEnvironment) tableEnv).execEnv().getStreamGraph(); + graph.setJobName(name + ": " + query); + plan = graph; + } else { + final int parallelism = env.getExecution().getParallelism(); + final Plan unoptimizedPlan = ((BatchTableEnvironment) tableEnv).execEnv().createProgramPlan(); + unoptimizedPlan.setJobName(name + ": " + query); + final Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), + clusterClient.getFlinkConfiguration()); + plan = ClusterClient.getOptimizedPlan(compiler, unoptimizedPlan, parallelism); + } + + // create job graph + final JobGraph jobGraph = clusterClient.getJobGraph( + plan, dependencies, Collections.emptyList(), SavepointRestoreSettings.none()); + + return Tuple2.of(jobGraph, table.getSchema()); + } + + private ClusterClient<?> prepareDeployment(Deployment deploy) { + + // change some configuration options for being more responsive + flinkConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, deploy.getResponseTimeout() + " ms"); + flinkConfig.setString(AkkaOptions.CLIENT_TIMEOUT, deploy.getResponseTimeout() + " ms"); + + // get cluster client + final ClusterClient<?> clusterClient; + if (deploy.isStandaloneDeployment()) { + clusterClient = getStandaloneClusterClient(flinkConfig); + clusterClient.setPrintStatusDuringExecution(false); + } else { + throw new SqlExecutionException("Unsupported deployment."); + } + + return clusterClient; + } + + private ClusterClient<?> getStandaloneClusterClient(Configuration configuration) { + final ClusterDescriptor<StandaloneClusterId> descriptor = new StandaloneClusterDescriptor(configuration); + try { + return descriptor.retrieve(StandaloneClusterId.getInstance()); + } catch (ClusterRetrieveException e) { + throw new SqlExecutionException("Could not retrievePage standalone cluster.", e); + } + } + + private Environment getEnvironment(SessionContext context) { + return Environment.merge(environment, context.getEnvironment()); + } + + private TableEnvironment getTableEnvironment(Environment env) { --- End diff -- should we cache the `TableEnvironment`. It is basically created for each interaction with the executor.
---