This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 89be0b1  [FLINK-18059][sql-client] Fix create/drop catalog statement 
can not be executed in sql client
89be0b1 is described below

commit 89be0b1c12da7a0c72f0db304479767db8f72488
Author: godfrey he <[email protected]>
AuthorDate: Fri Jun 5 15:13:09 2020 +0800

    [FLINK-18059][sql-client] Fix create/drop catalog statement can not be 
executed in sql client
    
    This closes #12435
---
 .../apache/flink/table/client/cli/CliClient.java   | 111 +++++----------------
 .../apache/flink/table/client/cli/CliStrings.java  |   8 +-
 .../flink/table/client/cli/CliClientTest.java      |  14 +++
 .../flink/table/client/cli/TestingExecutor.java    |  15 ++-
 .../table/client/cli/TestingExecutorBuilder.java   |  12 ++-
 5 files changed, 68 insertions(+), 92 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index ffb2d0e..b1acc76 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -313,10 +313,10 @@ public class CliClient {
                                callInsert(cmdCall);
                                break;
                        case CREATE_TABLE:
-                               callCreateTable(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_TABLE_CREATED);
                                break;
                        case DROP_TABLE:
-                               callDropTable(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_TABLE_REMOVED);
                                break;
                        case CREATE_VIEW:
                                callCreateView(cmdCall);
@@ -325,28 +325,37 @@ public class CliClient {
                                callDropView(cmdCall);
                                break;
                        case CREATE_FUNCTION:
-                               callCreateFunction(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_FUNCTION_CREATED);
                                break;
                        case DROP_FUNCTION:
-                               callDropFunction(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_FUNCTION_REMOVED);
                                break;
                        case ALTER_FUNCTION:
-                               callAlterFunction(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED,
+                                               
CliStrings.MESSAGE_ALTER_FUNCTION_FAILED);
                                break;
                        case SOURCE:
                                callSource(cmdCall);
                                break;
                        case CREATE_DATABASE:
-                               callCreateDatabase(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_DATABASE_CREATED);
                                break;
                        case DROP_DATABASE:
-                               callDropDatabase(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_DATABASE_REMOVED);
                                break;
                        case ALTER_DATABASE:
-                               callAlterDatabase(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_DATABASE_SUCCEEDED,
+                                               
CliStrings.MESSAGE_ALTER_DATABASE_FAILED);
                                break;
                        case ALTER_TABLE:
-                               callAlterTable(cmdCall);
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED,
+                                               
CliStrings.MESSAGE_ALTER_TABLE_FAILED);
+                               break;
+                       case CREATE_CATALOG:
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_CATALOG_CREATED);
+                               break;
+                       case DROP_CATALOG:
+                               callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_CATALOG_REMOVED);
                                break;
                        default:
                                throw new SqlClientException("Unsupported 
command: " + cmdCall.command);
@@ -583,24 +592,6 @@ public class CliClient {
                return true;
        }
 
-       private void callCreateTable(SqlCommandCall cmdCall) {
-               try {
-                       executor.createTable(sessionId, cmdCall.operands[0]);
-                       printInfo(CliStrings.MESSAGE_TABLE_CREATED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
-       private void callDropTable(SqlCommandCall cmdCall) {
-               try {
-                       executor.dropTable(sessionId, cmdCall.operands[0]);
-                       printInfo(CliStrings.MESSAGE_TABLE_REMOVED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
        private void callCreateView(SqlCommandCall cmdCall) {
                final String name = cmdCall.operands[0];
                final String query = cmdCall.operands[1];
@@ -641,33 +632,6 @@ public class CliClient {
                }
        }
 
-       private void callCreateFunction(SqlCommandCall cmdCall) {
-               try {
-                       executor.executeSql(sessionId, cmdCall.operands[0]);
-                       printInfo(CliStrings.MESSAGE_FUNCTION_CREATED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
-       private void callDropFunction(SqlCommandCall cmdCall) {
-               try {
-                       executor.executeSql(sessionId, cmdCall.operands[0]);
-                       printInfo(CliStrings.MESSAGE_FUNCTION_REMOVED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
-       private void callAlterFunction(SqlCommandCall cmdCall) {
-               try {
-                       executor.executeSql(sessionId, cmdCall.operands[0]);
-                       printInfo(CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED);
-               } catch (SqlExecutionException e) {
-                       
printExecutionException(CliStrings.MESSAGE_ALTER_FUNCTION_FAILED, e);
-               }
-       }
-
        private void callSource(SqlCommandCall cmdCall) {
                final String pathString = cmdCall.operands[0];
 
@@ -697,43 +661,16 @@ public class CliClient {
                call.ifPresent(this::callCommand);
        }
 
-       private void callCreateDatabase(SqlCommandCall cmdCall) {
-               final String createDatabaseStmt = cmdCall.operands[0];
-               try {
-                       executor.executeUpdate(sessionId, createDatabaseStmt);
-                       printInfo(CliStrings.MESSAGE_DATABASE_CREATED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
-       private void callDropDatabase(SqlCommandCall cmdCall) {
-               final String dropDatabaseStmt = cmdCall.operands[0];
-               try {
-                       executor.executeUpdate(sessionId, dropDatabaseStmt);
-                       printInfo(CliStrings.MESSAGE_DATABASE_REMOVED);
-               } catch (SqlExecutionException e) {
-                       printExecutionException(e);
-               }
-       }
-
-       private void callAlterDatabase(SqlCommandCall cmdCall) {
-               final String alterDatabaseStmt = cmdCall.operands[0];
-               try {
-                       executor.executeUpdate(sessionId, alterDatabaseStmt);
-                       printInfo(CliStrings.MESSAGE_DATABASE_ALTER_SUCCEEDED);
-               } catch (SqlExecutionException e) {
-                       
printExecutionException(CliStrings.MESSAGE_DATABASE_ALTER_FAILED, e);
-               }
+       private void callDdl(String ddl, String successMessage) {
+               callDdl(ddl, successMessage, null);
        }
 
-       private void callAlterTable(SqlCommandCall cmdCall) {
-               final String alterTableStmt = cmdCall.operands[0];
+       private void callDdl(String ddl, String successMessage, String 
errorMessage) {
                try {
-                       executor.executeUpdate(sessionId, alterTableStmt);
-                       printInfo(CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED);
+                       executor.executeSql(sessionId, ddl);
+                       printInfo(successMessage);
                } catch (SqlExecutionException e) {
-                       
printExecutionException(CliStrings.MESSAGE_ALTER_TABLE_FAILED, e);
+                       printExecutionException(errorMessage, e);
                }
        }
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index fd04998..3a439e2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -164,9 +164,13 @@ public final class CliStrings {
 
        public static final String MESSAGE_DATABASE_REMOVED = "Database has 
been removed.";
 
-       public static final String MESSAGE_DATABASE_ALTER_SUCCEEDED = "Alter 
database succeeded!";
+       public static final String MESSAGE_ALTER_DATABASE_SUCCEEDED = "Alter 
database succeeded!";
 
-       public static final String MESSAGE_DATABASE_ALTER_FAILED = "Alter 
database failed!";
+       public static final String MESSAGE_ALTER_DATABASE_FAILED = "Alter 
database failed!";
+
+       public static final String MESSAGE_CATALOG_CREATED = "Catalog has been 
created.";
+
+       public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been 
removed.";
 
        public static final String MESSAGE_VIEW_ALREADY_EXISTS = "A view with 
this name has already been defined in the current CLI session.";
 
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 19d91ab..64e395f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -226,6 +226,20 @@ public class CliClientTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testCreateCatalog() throws Exception {
+               TestingExecutor executor = new 
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
+               testExecuteSql(executor, "create catalog c1 
with('type'='generic_in_memory');");
+               assertThat(executor.getNumExecuteSqlCalls(), is(1));
+       }
+
+       @Test
+       public void testDropCatalog() throws Exception {
+               TestingExecutor executor = new 
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
+               testExecuteSql(executor, "drop catalog c1;");
+               assertThat(executor.getNumExecuteSqlCalls(), is(1));
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index b22f8d7..179cfb9 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.util.List;
@@ -58,6 +59,9 @@ class TestingExecutor implements Executor {
        private int numUseDatabaseCalls = 0;
        private BiConsumerWithException<String, String, SqlExecutionException> 
useDatabaseConsumer;
 
+       private int numExecuteSqlCalls = 0;
+       private BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeUpdateConsumer;
+
        private final SqlParserHelper helper;
 
        TestingExecutor(
@@ -65,12 +69,14 @@ class TestingExecutor implements Executor {
                        List<SupplierWithException<TypedResult<Integer>, 
SqlExecutionException>> snapshotResults,
                        List<SupplierWithException<List<Row>, 
SqlExecutionException>> resultPages,
                        BiConsumerWithException<String, String, 
SqlExecutionException> useCatalogConsumer,
-                       BiConsumerWithException<String, String, 
SqlExecutionException> useDatabaseConsumer) {
+                       BiConsumerWithException<String, String, 
SqlExecutionException> useDatabaseConsumer,
+                       BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeUpdateConsumer) {
                this.resultChanges = resultChanges;
                this.snapshotResults = snapshotResults;
                this.resultPages = resultPages;
                this.useCatalogConsumer = useCatalogConsumer;
                this.useDatabaseConsumer = useDatabaseConsumer;
+               this.executeUpdateConsumer = executeUpdateConsumer;
                helper = new SqlParserHelper();
                helper.registerTables();
        }
@@ -176,7 +182,8 @@ class TestingExecutor implements Executor {
 
        @Override
        public TableResult executeSql(String sessionId, String statement) 
throws SqlExecutionException {
-               return null;
+               numExecuteSqlCalls++;
+               return executeUpdateConsumer.apply(sessionId, statement);
        }
 
        @Override
@@ -237,4 +244,8 @@ class TestingExecutor implements Executor {
        public int getNumUseDatabaseCalls() {
                return numUseDatabaseCalls;
        }
+
+       public int getNumExecuteSqlCalls() {
+               return numExecuteSqlCalls;
+       }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
index e878964..66ba97f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
@@ -18,10 +18,12 @@
 package org.apache.flink.table.client.cli;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.util.Arrays;
@@ -38,6 +40,7 @@ class TestingExecutorBuilder {
        private List<SupplierWithException<List<Row>, SqlExecutionException>> 
resultPagesSupplier = Collections.emptyList();
        private BiConsumerWithException<String, String, SqlExecutionException> 
setUseCatalogConsumer = (ignoredA, ignoredB) -> {};
        private BiConsumerWithException<String, String, SqlExecutionException> 
setUseDatabaseConsumer = (ignoredA, ignoredB) -> {};
+       private BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> setExecuteSqlConsumer = (ignoredA, ignoredB) -> null;
 
        @SafeVarargs
        public final TestingExecutorBuilder 
setResultChangesSupplier(SupplierWithException<TypedResult<List<Tuple2<Boolean, 
Row>>>, SqlExecutionException> ... resultChangesSupplier) {
@@ -67,12 +70,19 @@ class TestingExecutorBuilder {
                return this;
        }
 
+       public final TestingExecutorBuilder setExecuteSqlConsumer(
+                       BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> setExecuteUpdateConsumer) {
+               this.setExecuteSqlConsumer = setExecuteUpdateConsumer;
+               return this;
+       }
+
        public TestingExecutor build() {
                return new TestingExecutor(
                        resultChangesSupplier,
                        snapshotResultsSupplier,
                        resultPagesSupplier,
                        setUseCatalogConsumer,
-                       setUseDatabaseConsumer);
+                       setUseDatabaseConsumer,
+                       setExecuteSqlConsumer);
        }
 }

Reply via email to