KurtYoung commented on a change in pull request #15366:
URL: https://github.com/apache/flink/pull/15366#discussion_r603735057
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -250,50 +253,78 @@ public void close() {
}
/**
- * Submits a SQL update statement and prints status information and/or
errors on the terminal.
+ * Submits content from Sql file and prints status information and/or
errors on the terminal.
*
- * @param statement SQL update statement
+ * @param content SQL file content
* @return flag to indicate if the submission was successful or not
*/
- public boolean submitUpdate(String statement) {
+ public boolean executeSqlFile(String content) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
- terminal.writer().println(new AttributedString(statement).toString());
- terminal.flush();
- final Optional<Operation> operation = parseCommand(statement);
- // only support INSERT INTO/OVERWRITE
- return operation
- .map(
- op -> {
- if (op instanceof CatalogSinkModifyOperation) {
- return callInsert((CatalogSinkModifyOperation)
op);
- } else {
- printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
- return false;
- }
- })
- .orElse(false);
+ for (String statement : CliStatementSplitter.splitContent(content)) {
+ terminal.writer().println(new
AttributedString(statement).toString());
+ terminal.flush();
+
+ if (!executeStatement(statement, ExecutionMode.NON_INTERACTIVE)) {
+ // cancel execution when meet error or ctrl + C;
+ return false;
+ }
+ }
+ return true;
}
//
--------------------------------------------------------------------------------------------
+ enum ExecutionMode {
+ INTERACTIVE,
+
+ NON_INTERACTIVE;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private boolean executeStatement(String statement, ExecutionMode
executionMode) {
+ try {
+ final Optional<Operation> operation = parseCommand(statement);
+ operation.ifPresent(op -> callOperation(op, executionMode));
+ } catch (SqlExecutionException e) {
+ printExecutionException(e);
+ return false;
+ }
+ return true;
+ }
+
+ private void validate(Operation operation, ExecutionMode executionMode) {
+ ResultMode mode =
executor.getSessionConfig(sessionId).get(EXECUTION_RESULT_MODE);
+ if (operation instanceof QueryOperation
+ && executionMode.equals(ExecutionMode.NON_INTERACTIVE)
+ && !mode.equals(TABLEAU)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "In non-interactive mode, it only supports to use
%s as value of %s when execute query. Please add 'SET %s=%s;' in the sql file.",
+ TABLEAU,
+ EXECUTION_RESULT_MODE.key(),
+ EXECUTION_RESULT_MODE.key(),
+ TABLEAU));
+ }
+ }
+
private Optional<Operation> parseCommand(String stmt) {
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";")) {
stmt = stmt.substring(0, stmt.length() - 1).trim();
}
- try {
- Operation operation = executor.parseStatement(sessionId, stmt);
- return Optional.of(operation);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- return Optional.empty();
+
+ Operation operation = executor.parseStatement(sessionId, stmt);
+ return Optional.of(operation);
}
- private void callOperation(Operation operation) {
+ private void callOperation(Operation operation, ExecutionMode mode) {
+
Review comment:
remove blank line
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
##########
@@ -197,4 +219,14 @@ public void testVerboseErrorMessage() {
assertThat(output, containsString(error));
}
}
+
+ @Test
+ public void testExecuteSqlFile() throws IOException {
+ String[] args = new String[] {"-f", sqlFilePath};
+ SqlClient.main(args);
+ final URL url =
getClass().getClassLoader().getResource("sql-client-help-command.out");
+ final String help = FileUtils.readFileUtf8(new File(url.getFile()));
+ // TODO: escape test to trigger test
Review comment:
what's this?
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
##########
@@ -218,6 +258,81 @@ private Path historyTempFile() throws IOException {
return File.createTempFile("history", "tmp").toPath();
}
+ private String executeSqlFromContent(MockExecutor executor, String
content) throws IOException {
+ String sessionId = executor.openSession("test-session");
+ OutputStream outputStream = new ByteArrayOutputStream(256);
+ System.setOut(new PrintStream(outputStream));
+ try (Terminal terminal =
TerminalUtils.createDummyTerminal(outputStream);
+ CliClient client =
+ new CliClient(terminal, sessionId, executor,
historyTempFile(), null)) {
+ client.executeSqlFile(content);
+ }
+ return outputStream.toString();
+ }
+
+ private void verifyCancelExecution() throws Exception {
Review comment:
Don't think we need this method since it's only been called in single
place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]