SteNicholas commented on code in PR #107: URL: https://github.com/apache/incubator-paimon-webui/pull/107#discussion_r1428776828
########## paimon-web-common/src/main/java/org/apache/paimon/web/common/executor/Executor.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.executor; + +import org.apache.paimon.web.common.result.FetchResultParams; +import org.apache.paimon.web.common.result.SubmitResult; + +/** The Executor interface. */ Review Comment: Could you describe this interface more detailed? ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/executor/Executor.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.executor; + +import org.apache.paimon.web.common.result.FetchResultParams; +import org.apache.paimon.web.common.result.SubmitResult; + +/** The Executor interface. */ +public interface Executor { + + SubmitResult executeSql(String statement) throws Exception; + + SubmitResult fetchResults(FetchResultParams params) throws Exception; + + boolean stop(String jobId, boolean withSavepoint, boolean withDrain) throws Exception; Review Comment: Is the executor only defined for Flink job? What's the behavior of Spark job for `withSavepoint` and `withDrain` param? ########## paimon-web-flink/src/main/java/org/apache/paimon/web/flink/executor/FlinkExecutorFactory.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.flink.executor; + +import org.apache.paimon.web.common.executor.Executor; +import org.apache.paimon.web.common.executor.ExecutorFactory; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; + +/** Factory class for creating executors that interface with the Flink Table API. */ +public class FlinkExecutorFactory implements ExecutorFactory { + + private final RuntimeExecutionMode mode; + + private final Configuration configuration; + + public FlinkExecutorFactory(RuntimeExecutionMode mode, Configuration configuration) { + this.mode = mode; + this.configuration = configuration; + } + + @Override + public Executor createExecutor() { + EnvironmentSettings environmentSettings = getEnvironmentSettings(mode); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + TableEnvironment tableEvn = StreamTableEnvironmentImpl.create(env, environmentSettings); + return new FlinkExecutor(env, tableEvn); + } + + private EnvironmentSettings getEnvironmentSettings(RuntimeExecutionMode mode) { + return mode == RuntimeExecutionMode.BATCH Review Comment: What's the behavior of `RuntimeExecutionMode.DYNAMIC`? ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/executor/Executor.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.executor; + +import org.apache.paimon.web.common.result.FetchResultParams; +import org.apache.paimon.web.common.result.SubmitResult; + +/** The Executor interface. */ +public interface Executor { + + SubmitResult executeSql(String statement) throws Exception; Review Comment: Add the comment of all interface methods. ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/result/SubmitResult.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.result; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** This class represents the result of a job. Including flink job and spark Job. */ Review Comment: Why `SubmitRsult` represents the result of a job? IMO, the `SubmitRsult` only means the submission result of job. BTW, the `Including flink job and spark Job.` could be removed. ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/function/Supplier.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.function; + +/** Supplier Functional interface that can use exceptions. */ +@FunctionalInterface +public interface Supplier<T> { Review Comment: Why introduce this interface? Why not use `java.util.function.Supplier`? ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/result/SubmitResult.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.result; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** This class represents the result of a job. Including flink job and spark Job. */ +public class SubmitResult implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String submitId; + private final String jobId; + private final String status; + private final List<Map<String, Object>> data; + private final boolean shouldFetchResult; + + private SubmitResult(Builder builder) { + this.jobId = builder.jobId; + this.status = builder.status; + this.data = builder.data; + this.submitId = builder.submitId; + this.shouldFetchResult = builder.shouldFetchResult; + } + + public String getSubmitId() { + return submitId; + } + + public String getJobId() { + return jobId; + } + + public String getStatus() { + return status; + } + + public List<Map<String, Object>> getData() { + return data; + } + + public boolean shouldFetchResult() { + return shouldFetchResult; + } + + public static Builder builder() { + return new Builder(); + } + + /** The builder for SubmitResult. */ + public static class Builder { + + private String submitId; + private String jobId; + private String status; + private List<Map<String, Object>> data = new ArrayList<>(); + private boolean shouldFetchResult; + + public Builder submitId(String submitId) { + this.submitId = submitId; + return this; + } + + public Builder jobId(String jobId) { + this.jobId = jobId; + return this; + } + + public Builder status(String status) { + this.status = status; + return this; + } + + public Builder data(List<Map<String, Object>> data) { + this.data = data; + return this; + } + + public Builder addData(Map<String, Object> dataItem) { + this.data.add(dataItem); + return this; + } + + public Builder shouldFetchResult(boolean shouldFetchResult) { + this.shouldFetchResult = shouldFetchResult; + return this; + } + + public SubmitResult build() { + return new SubmitResult(this); + } + } + + @Override + public String toString() { + return "SubmitResult{" Review Comment: Does the `shouldFetchResult` need to be included in `toString`? ########## paimon-web-common/pom.xml: ########## @@ -38,11 +38,6 @@ under the License. </properties> <dependencies> - <dependency> Review Comment: Why remove this dependency? Is this change related with job submission? ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/result/SubmitResult.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.result; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** This class represents the result of a job. Including flink job and spark Job. */ +public class SubmitResult implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String submitId; Review Comment: What's the purpose of definition for `submitId`? ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/executor/ExecutorFactory.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.executor; + +/** The ExecutorFactory interface provides a method to create an Executor. */ +public interface ExecutorFactory { + + Executor createExecutor() throws Exception; Review Comment: Add the comment of all interface methods. ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/result/FetchResultParams.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.result; + +/** Represents the parameters required to fetch the results of a certain operation. */ +public class FetchResultParams { + + private final String sessionId; + private final String submitId; + private final Long token; + + private FetchResultParams(Builder builder) { Review Comment: I don't think the constructor need to use builder as parameter. The builder mode isn't used via this way. ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/result/SubmitResult.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.result; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** This class represents the result of a job. Including flink job and spark Job. */ +public class SubmitResult implements Serializable { Review Comment: ```suggestion public class SubmissionResult implements Serializable { ``` ########## paimon-web-common/src/main/java/org/apache/paimon/web/common/executor/ExecutorFactory.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.common.executor; + +/** The ExecutorFactory interface provides a method to create an Executor. */ Review Comment: ```suggestion /** This factory is defined to create an {@link Executor} instance. */ ``` ########## paimon-web-flink/src/main/java/org/apache/paimon/web/flink/executor/FlinkExecutorFactory.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.flink.executor; + +import org.apache.paimon.web.common.executor.Executor; +import org.apache.paimon.web.common.executor.ExecutorFactory; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; + +/** Factory class for creating executors that interface with the Flink Table API. */ +public class FlinkExecutorFactory implements ExecutorFactory { + + private final RuntimeExecutionMode mode; + + private final Configuration configuration; + + public FlinkExecutorFactory(RuntimeExecutionMode mode, Configuration configuration) { + this.mode = mode; + this.configuration = configuration; + } + + @Override + public Executor createExecutor() { + EnvironmentSettings environmentSettings = getEnvironmentSettings(mode); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + TableEnvironment tableEvn = StreamTableEnvironmentImpl.create(env, environmentSettings); Review Comment: ```suggestion TableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, environmentSettings); ``` ########## paimon-web-flink/src/main/java/org/apache/paimon/web/flink/exception/SqlExecutionException.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.flink.exception; + +/** Exception class for SQL execution issues. */ Review Comment: ```suggestion /** Exception of SQL execution. */ ``` ########## paimon-web-flink/src/main/java/org/apache/paimon/web/flink/executor/FlinkExecutor.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.flink.executor; + +import org.apache.paimon.web.common.executor.Executor; +import org.apache.paimon.web.common.result.FetchResultParams; +import org.apache.paimon.web.common.result.SubmitResult; +import org.apache.paimon.web.flink.exception.SqlExecutionException; +import org.apache.paimon.web.flink.operation.FlinkSqlOperationType; +import org.apache.paimon.web.flink.parser.StatementParser; +import org.apache.paimon.web.flink.utils.CollectResultUtil; +import org.apache.paimon.web.flink.utils.FormatSqlExceptionUtil; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** The flink implementation of the {@link Executor}. */ +public class FlinkExecutor implements Executor { + + private static final String EXECUTE_SUCCESS = "OK"; + private final StreamExecutionEnvironment env; + private final TableEnvironment tableEnv; + + public FlinkExecutor(StreamExecutionEnvironment env, TableEnvironment tableEnv) { + this.env = env; + this.tableEnv = tableEnv; + } + + @Override + public SubmitResult executeSql(String multiStatement) throws SqlExecutionException { + List<String> insertStatements = new ArrayList<>(); + String[] statements = StatementParser.parse(multiStatement); + for (String statement : statements) { + FlinkSqlOperationType operationType = FlinkSqlOperationType.getOperationType(statement); + switch (operationType.getCategory()) { + case DQL: + if (insertStatements.isEmpty()) { + return executeQueryStatement(statement); + } + break; + case DML: + if (operationType.getType().equals(FlinkSqlOperationType.INSERT.getType())) { + insertStatements.add(statement); + } else if (insertStatements.isEmpty()) { + return executeDmlStatement(statement); + } + break; + default: + executeStatementWithoutResult(statement); + break; + } + } + return executeInsertStatements(insertStatements); Review Comment: Is insert statement same as DML? ########## paimon-web-flink/src/main/java/org/apache/paimon/web/flink/executor/FlinkExecutor.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.flink.executor; + +import org.apache.paimon.web.common.executor.Executor; +import org.apache.paimon.web.common.result.FetchResultParams; +import org.apache.paimon.web.common.result.SubmitResult; +import org.apache.paimon.web.flink.exception.SqlExecutionException; +import org.apache.paimon.web.flink.operation.FlinkSqlOperationType; +import org.apache.paimon.web.flink.parser.StatementParser; +import org.apache.paimon.web.flink.utils.CollectResultUtil; +import org.apache.paimon.web.flink.utils.FormatSqlExceptionUtil; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** The flink implementation of the {@link Executor}. */ +public class FlinkExecutor implements Executor { + + private static final String EXECUTE_SUCCESS = "OK"; + private final StreamExecutionEnvironment env; + private final TableEnvironment tableEnv; + + public FlinkExecutor(StreamExecutionEnvironment env, TableEnvironment tableEnv) { + this.env = env; + this.tableEnv = tableEnv; + } + + @Override + public SubmitResult executeSql(String multiStatement) throws SqlExecutionException { + List<String> insertStatements = new ArrayList<>(); + String[] statements = StatementParser.parse(multiStatement); + for (String statement : statements) { + FlinkSqlOperationType operationType = FlinkSqlOperationType.getOperationType(statement); + switch (operationType.getCategory()) { + case DQL: + if (insertStatements.isEmpty()) { Review Comment: Could empty insert statement be handled early? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
