This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 80cfeb8 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from
SQL CLI
80cfeb8 is described below
commit 80cfeb84dfa479fd1a41dde8863f88eca8a796ca
Author: Rui Li <[email protected]>
AuthorDate: Fri Jan 3 17:38:33 2020 +0800
[FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI
closes #10759.
---
.../main/java/org/apache/flink/table/client/cli/CliClient.java | 10 ++++++----
.../java/org/apache/flink/table/client/cli/CliStrings.java | 1 +
.../org/apache/flink/table/client/cli/SqlCommandParser.java | 4 ++++
.../apache/flink/table/client/gateway/local/LocalExecutor.java | 4 ++--
.../java/org/apache/flink/table/client/cli/CliClientTest.java | 3 +++
.../apache/flink/table/client/cli/SqlCommandParserTest.java | 3 +++
6 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 8b80373..797bca6 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -224,11 +224,12 @@ public class CliClient {
terminal.flush();
final Optional<SqlCommandCall> parsedStatement =
parseCommand(statement);
- // only support INSERT INTO
+ // only support INSERT INTO/OVERWRITE
return parsedStatement.map(cmdCall -> {
switch (cmdCall.command) {
case INSERT_INTO:
- return callInsertInto(cmdCall);
+ case INSERT_OVERWRITE:
+ return callInsert(cmdCall);
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
@@ -294,7 +295,8 @@ public class CliClient {
callSelect(cmdCall);
break;
case INSERT_INTO:
- callInsertInto(cmdCall);
+ case INSERT_OVERWRITE:
+ callInsert(cmdCall);
break;
case CREATE_TABLE:
callCreateTable(cmdCall);
@@ -528,7 +530,7 @@ public class CliClient {
}
}
- private boolean callInsertInto(SqlCommandCall cmdCall) {
+ private boolean callInsert(SqlCommandCall cmdCall) {
printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
try {
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index a04068f..77c7039 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,6 +50,7 @@ public final class CliStrings {
.append(formatCommand(SqlCommand.EXPLAIN, "Describes the
execution plan of a query or table with the given name."))
.append(formatCommand(SqlCommand.HELP, "Prints the available
commands."))
.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the
results of a SQL SELECT query into a declared table sink."))
+ .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the
results of a SQL SELECT query into a declared table sink and overwrite existing
data."))
.append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI
client."))
.append(formatCommand(SqlCommand.RESET, "Resets all session
configuration properties."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT
query on the Flink cluster."))
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index a01b3c0..1387d18 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -127,6 +127,10 @@ public final class SqlCommandParser {
"(INSERT\\s+INTO.*)",
SINGLE_OPERAND),
+ INSERT_OVERWRITE(
+ "(INSERT\\s+OVERWRITE.*)",
+ SINGLE_OPERAND),
+
CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND),
DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 573aeae..513e215 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -112,7 +112,7 @@ public class LocalExecutor implements Executor {
private final ResultStore resultStore;
// insert into sql match pattern
- public static final Pattern INSERT_INTO_SQL_PATTERN =
Pattern.compile("(INSERT\\s+INTO.*)",
+ private static final Pattern INSERT_SQL_PATTERN =
Pattern.compile("(INSERT\\s+(INTO|OVERWRITE).*)",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
/**
@@ -576,7 +576,7 @@ public class LocalExecutor implements Executor {
applyUpdate(context, context.getTableEnvironment(),
context.getQueryConfig(), statement);
//Todo: we should refactor following condition after
TableEnvironment has support submit job directly.
- if
(!INSERT_INTO_SQL_PATTERN.matcher(statement.trim()).matches()) {
+ if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
return null;
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 4bbe050..3016724 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -67,17 +67,20 @@ import static org.mockito.Mockito.verify;
public class CliClientTest extends TestLogger {
private static final String INSERT_INTO_STATEMENT = "INSERT INTO
MyTable SELECT * FROM MyOtherTable";
+ private static final String INSERT_OVERWRITE_STATEMENT = "INSERT
OVERWRITE MyTable SELECT * FROM MyOtherTable";
private static final String SELECT_STATEMENT = "SELECT * FROM
MyOtherTable";
@Test
public void testUpdateSubmission() {
verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false);
+ verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, false,
false);
}
@Test
public void testFailedUpdateSubmission() {
// fail at executor
verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true);
+ verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, true, true);
// fail early in client
verifyUpdateSubmission(SELECT_STATEMENT, false, true);
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 718a5cb..54e0a7d 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -65,6 +65,9 @@ public class SqlCommandParserTest {
"INSERT INTO other SELECT 1+1",
new SqlCommandCall(SqlCommand.INSERT_INTO, new
String[]{"INSERT INTO other SELECT 1+1"}));
testValidSqlCommand(
+ "INSERT OVERWRITE other SELECT 1+1",
+ new SqlCommandCall(SqlCommand.INSERT_OVERWRITE, new
String[]{"INSERT OVERWRITE other SELECT 1+1"}));
+ testValidSqlCommand(
"CREATE VIEW x AS SELECT 1+1",
new SqlCommandCall(SqlCommand.CREATE_VIEW, new
String[]{"x", "SELECT 1+1"}));
testValidSqlCommand(