hackergin commented on code in PR #25700: URL: https://github.com/apache/flink/pull/25700#discussion_r1867278465
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/Printer.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.application; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringReader; +import java.util.Iterator; + +/** Printer to print the statement results in application mode. */ +public class Printer { + + @VisibleForTesting public static final String STATEMENT_BEGIN = "Flink SQL> "; + @VisibleForTesting public static final String LINE_BEGIN = "> "; + + @VisibleForTesting final PrintWriter writer; + + public Printer() { + this(System.out); + } + + public Printer(OutputStream output) { + this.writer = new PrintWriter(output, true); + } + + public void print(String statement) { + try (BufferedReader reader = new BufferedReader(new StringReader(statement))) { + writer.print(STATEMENT_BEGIN); + writer.println(reader.readLine()); + String line; + while ((line = reader.readLine()) != null) { + writer.print(LINE_BEGIN); + writer.println(line); Review Comment: Should a semicolon (;) be added to the end of for the last statement which is not end with (;) . ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptExecutor.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.application; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserEOFException; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationExecutor; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Executor to execute the statements. */ +public class ScriptExecutor { + + final SessionContext context; + final Printer printer; + + private ScriptExecutor(SessionContext context) { + this(context, new Printer()); + } + + public ScriptExecutor(SessionContext context, Printer printer) { + this.context = context; + this.printer = printer; + } + + public static ScriptExecutor of(SessionContext context) { + return new ScriptExecutor(context); + } + + public void execute(String script) { + ResultIterator iterator = new ResultIterator(script); + try { + while (iterator.hasNext()) { + Result result = iterator.next(); + printer.print(result.statement); + if (result.error != null) { + throw result.error; + } else { + printer.print(checkNotNull(result.fetcher)); + } + } + } catch (Throwable t) { + printer.print(t); + throw new SqlGatewayException("Failed to execute the script.", t); + } + } + + class ResultIterator implements Iterator<Result> { + + // these 3 string builders is here to pad the split sql to its original line and column + // number + StringBuilder previousPaddingSqlBuilder = new StringBuilder(); + StringBuilder currentPaddingSqlBuilder = new StringBuilder(); + StringBuilder currentPaddingLineBuilder = new StringBuilder(); + + private final String script; + private int position; + + private String statement; + private Throwable throwable; + private OperationExecutor executor; + + public ResultIterator(String script) { + this.script = script; + this.position = 0; + } + + @Override + public boolean hasNext() { + State state = State.NORMAL; + StringBuilder currentSqlBuilder = new StringBuilder(); + char currentChar = 0; + + boolean hasNext = false; + // rebuild the executor because statement may change planner. + executor = + new OperationExecutor( + context, + (config, classloader) -> + StreamExecutionEnvironment.getExecutionEnvironment(config)); + for (int i = position; i < script.length(); i++) { + char lastChar = currentChar; + currentChar = script.charAt(i); + + currentSqlBuilder.append(currentChar); + currentPaddingLineBuilder.append(" "); + + switch (currentChar) { + case '\'': + if (state == State.SINGLE_QUOTE) { + state = State.NORMAL; + } else if (state == State.NORMAL) { + state = State.SINGLE_QUOTE; + } + break; + case '"': + if (state == State.DOUBLE_QUOTE) { + state = State.NORMAL; + } else if (state == State.NORMAL) { + state = State.DOUBLE_QUOTE; + } + break; + case '`': + if (state == State.BACK_QUOTE) { + state = State.NORMAL; + } else if (state == State.NORMAL) { + state = State.BACK_QUOTE; + } + break; + case '-': + if (lastChar == '-' && state == State.NORMAL) { + state = State.SINGLE_COMMENT; + } + break; + case '\n': + if (state == State.SINGLE_COMMENT) { + state = State.NORMAL; + } + currentPaddingLineBuilder.setLength(0); + currentPaddingSqlBuilder.append("\n"); + break; + case '*': + if (lastChar == '/' && state == State.NORMAL) { + state = State.MULTI_LINE_COMMENT; + } + break; + case '/': + if (lastChar == '*' && state == State.MULTI_LINE_COMMENT) { + state = State.NORMAL; + } + break; + case ';': + if (state == State.NORMAL) { + i = + prefetch( + i + 1, + currentSqlBuilder, + currentPaddingSqlBuilder, + currentPaddingLineBuilder); + statement = currentSqlBuilder.toString(); + try { + position = i + 1; + parse(previousPaddingSqlBuilder + statement); Review Comment: Here, we performed parsing first. Is this to catch syntax errors in advance? ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.application; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; +import org.apache.flink.client.program.StreamContextEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCaseBase; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.utils.TestSqlStatement; +import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase to run the script with {@link ScriptExecutor}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class ScriptExecutorITCase extends AbstractSqlGatewayStatementITCaseBase { + + private ScriptExecutor executor; + + @Override + protected String runStatements(List<TestSqlStatement> statements) throws Exception { + // Prepare the cluster + final Configuration configuration = new Configuration(); + configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, false); + configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true); + + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + + // Execute + try (MiniCluster miniCluster = new MiniCluster(clusterConfiguration); + OutputStream outputStream = new ByteArrayOutputStream(1024)) { + miniCluster.start(); + MiniClusterPipelineExecutorServiceLoader loader = + new MiniClusterPipelineExecutorServiceLoader(miniCluster); + StreamContextEnvironment.setAsContext( + loader, + miniCluster.getConfiguration(), + ScriptExecutor.class.getClassLoader(), + false, + false); + + executor = + new TestScriptExecutor( + SessionContext.create( + new DefaultContext( + miniCluster.getConfiguration(), + Collections.emptyList()), + SessionHandle.create(), + SessionEnvironment.newBuilder() + .setSessionEndpointVersion( + SqlGatewayRestAPIVersion.getDefaultVersion()) + .build(), + Executors.newDirectExecutorService()), + new TestPrinter(statements.iterator(), outputStream)); + executor.execute( + String.join( + "\n", + statements.stream() + .map(TestSqlStatement::getSql) + .collect(Collectors.joining()))); + return outputStream.toString(); + } + } + + @Test + void testParseStatementWithComments() throws Exception { + assertThat(runScript("comment.q")) + .contains( + "+------+------+------+-----+--------+-----------+" + + System.lineSeparator() + + "| name | type | null | key | extras | watermark |" + + System.lineSeparator() + + "+------+------+------+-----+--------+-----------+" + + System.lineSeparator() + + "| a | INT | TRUE | | | |" + + System.lineSeparator() + + "| b | INT | TRUE | | | |" + + System.lineSeparator() + + "+------+------+------+-----+--------+-----------+"); + } + + @Test + void testParseErrorPositionIsCorrect() throws Exception { + assertThat(runScript("error.q")) + .contains( + "org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered \")\" at line 26, column 1."); + } + + @Override + protected boolean isStreaming() { + return executor.context.getSessionConf().get(RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING; + } + + @Override + protected boolean skip() { + return parameters.getSqlPath().contains("set.q"); + } + + private static class TestScriptExecutor extends ScriptExecutor { + + public TestScriptExecutor(SessionContext context, Printer printer) { + super(context, printer); + } + + @Override + public void execute(String script) { + ResultIterator iterator = new ResultIterator(script); + while (true) { + try { + if (iterator.hasNext()) { + Result result = iterator.next(); + printer.print(result.statement); + if (result.error != null) { + throw result.error; + } else { + printer.print(checkNotNull(result.fetcher)); + } + } else { + break; + } + } catch (Throwable t) { + printer.print(t); + } + } + } + } + + private class TestPrinter extends Printer { + + private final Iterator<TestSqlStatement> statements; + private String statement; + + public TestPrinter(Iterator<TestSqlStatement> statements, OutputStream stream) { + super(stream); + this.statements = statements; + } + + @Override + public void print(String statement) { + if (statements.hasNext()) { + writer.print(statements.next().getComment()); + } + this.statement = statement; + writer.print(statement); + writer.flush(); + } + + @Override + public void print(ResultFetcher fetcher) { + StatementType statementType = StatementType.match(statement); + try { + writer.print( + ScriptExecutorITCase.this.toString( + statementType, + fetcher.getResultSchema(), + fetcher.getConverter(), + new RowDataIterator(fetcher))); + writer.flush(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void print(Throwable t) { + Throwable root = t; + while (root.getCause() != null + && root.getCause().getMessage() != null + && !root.getCause().getMessage().isEmpty()) { + root = root.getCause(); + } + writer.print( + Tag.ERROR.addTag( + root.getClass().getName() + + ": " + + removeRowNumber(root.getMessage().trim()) + + "\n")); + writer.flush(); + } + } + + private String runScript(String fileName) throws Exception { + try (OutputStream outputStream = new ByteArrayOutputStream(1024)) { + executor = + new ScriptExecutor( + SessionContext.create( + new DefaultContext( + new Configuration(), Collections.emptyList()), + SessionHandle.create(), + SessionEnvironment.newBuilder() + .setSessionEndpointVersion( + SqlGatewayRestAPIVersion.getDefaultVersion()) + .build(), + Executors.newDirectExecutorService()), + new Printer(outputStream)); + String input = Review Comment: I got the following error on my windows pc. ``` java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/Users/jinfe/Documents/projects/flink/flink-table/flink-sql-gateway/target/test-classes/application/comment.q at java.base/sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182) at java.base/sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153) at java.base/sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77) at java.base/sun.nio.fs.WindowsPath.parse(WindowsPath.java:92) at java.base/sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:229) at java.base/java.nio.file.Path.of(Path.java:147) at java.base/java.nio.file.Paths.get(Paths.java:69) ``` We can change in to . ```java String input = IOUtils.toString( checkNotNull( AbstractSqlGatewayStatementITCase.class.getResourceAsStream( "/application/" + fileName)), StandardCharsets.UTF_8); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
