This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 80cfeb8  [FLINK-15468][sql-client] INSERT OVERWRITE not supported from 
SQL CLI
80cfeb8 is described below

commit 80cfeb84dfa479fd1a41dde8863f88eca8a796ca
Author: Rui Li <[email protected]>
AuthorDate: Fri Jan 3 17:38:33 2020 +0800

    [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI
    
    closes #10759.
---
 .../main/java/org/apache/flink/table/client/cli/CliClient.java | 10 ++++++----
 .../java/org/apache/flink/table/client/cli/CliStrings.java     |  1 +
 .../org/apache/flink/table/client/cli/SqlCommandParser.java    |  4 ++++
 .../apache/flink/table/client/gateway/local/LocalExecutor.java |  4 ++--
 .../java/org/apache/flink/table/client/cli/CliClientTest.java  |  3 +++
 .../apache/flink/table/client/cli/SqlCommandParserTest.java    |  3 +++
 6 files changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 8b80373..797bca6 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -224,11 +224,12 @@ public class CliClient {
                terminal.flush();
 
                final Optional<SqlCommandCall> parsedStatement = 
parseCommand(statement);
-               // only support INSERT INTO
+               // only support INSERT INTO/OVERWRITE
                return parsedStatement.map(cmdCall -> {
                        switch (cmdCall.command) {
                                case INSERT_INTO:
-                                       return callInsertInto(cmdCall);
+                               case INSERT_OVERWRITE:
+                                       return callInsert(cmdCall);
                                default:
                                        
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
                                        return false;
@@ -294,7 +295,8 @@ public class CliClient {
                                callSelect(cmdCall);
                                break;
                        case INSERT_INTO:
-                               callInsertInto(cmdCall);
+                       case INSERT_OVERWRITE:
+                               callInsert(cmdCall);
                                break;
                        case CREATE_TABLE:
                                callCreateTable(cmdCall);
@@ -528,7 +530,7 @@ public class CliClient {
                }
        }
 
-       private boolean callInsertInto(SqlCommandCall cmdCall) {
+       private boolean callInsert(SqlCommandCall cmdCall) {
                printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
                try {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index a04068f..77c7039 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,6 +50,7 @@ public final class CliStrings {
                .append(formatCommand(SqlCommand.EXPLAIN, "Describes the 
execution plan of a query or table with the given name."))
                .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
                .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the 
results of a SQL SELECT query into a declared table sink."))
+               .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the 
results of a SQL SELECT query into a declared table sink and overwrite existing 
data."))
                .append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI 
client."))
                .append(formatCommand(SqlCommand.RESET, "Resets all session 
configuration properties."))
                .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index a01b3c0..1387d18 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -127,6 +127,10 @@ public final class SqlCommandParser {
                        "(INSERT\\s+INTO.*)",
                        SINGLE_OPERAND),
 
+               INSERT_OVERWRITE(
+                       "(INSERT\\s+OVERWRITE.*)",
+                       SINGLE_OPERAND),
+
                CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND),
 
                DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 573aeae..513e215 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -112,7 +112,7 @@ public class LocalExecutor implements Executor {
        private final ResultStore resultStore;
 
        // insert into sql match pattern
-       public static final Pattern INSERT_INTO_SQL_PATTERN = 
Pattern.compile("(INSERT\\s+INTO.*)",
+       private static final Pattern INSERT_SQL_PATTERN = 
Pattern.compile("(INSERT\\s+(INTO|OVERWRITE).*)",
                        Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
        /**
@@ -576,7 +576,7 @@ public class LocalExecutor implements Executor {
                applyUpdate(context, context.getTableEnvironment(), 
context.getQueryConfig(), statement);
 
                //Todo: we should refactor following condition after 
TableEnvironment has support submit job directly.
-               if 
(!INSERT_INTO_SQL_PATTERN.matcher(statement.trim()).matches()) {
+               if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
                        return null;
                }
 
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 4bbe050..3016724 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -67,17 +67,20 @@ import static org.mockito.Mockito.verify;
 public class CliClientTest extends TestLogger {
 
        private static final String INSERT_INTO_STATEMENT = "INSERT INTO 
MyTable SELECT * FROM MyOtherTable";
+       private static final String INSERT_OVERWRITE_STATEMENT = "INSERT 
OVERWRITE MyTable SELECT * FROM MyOtherTable";
        private static final String SELECT_STATEMENT = "SELECT * FROM 
MyOtherTable";
 
        @Test
        public void testUpdateSubmission() {
                verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false);
+               verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, false, 
false);
        }
 
        @Test
        public void testFailedUpdateSubmission() {
                // fail at executor
                verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true);
+               verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, true, true);
 
                // fail early in client
                verifyUpdateSubmission(SELECT_STATEMENT, false, true);
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 718a5cb..54e0a7d 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -65,6 +65,9 @@ public class SqlCommandParserTest {
                        "INSERT INTO other SELECT 1+1",
                        new SqlCommandCall(SqlCommand.INSERT_INTO, new 
String[]{"INSERT INTO other SELECT 1+1"}));
                testValidSqlCommand(
+                       "INSERT OVERWRITE other SELECT 1+1",
+                       new SqlCommandCall(SqlCommand.INSERT_OVERWRITE, new 
String[]{"INSERT OVERWRITE other SELECT 1+1"}));
+               testValidSqlCommand(
                        "CREATE VIEW x AS SELECT 1+1",
                        new SqlCommandCall(SqlCommand.CREATE_VIEW, new 
String[]{"x", "SELECT 1+1"}));
                testValidSqlCommand(

Reply via email to