This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 89be0b1 [FLINK-18059][sql-client] Fix create/drop catalog statement
can not be executed in sql client
89be0b1 is described below
commit 89be0b1c12da7a0c72f0db304479767db8f72488
Author: godfrey he <[email protected]>
AuthorDate: Fri Jun 5 15:13:09 2020 +0800
[FLINK-18059][sql-client] Fix create/drop catalog statement can not be
executed in sql client
This closes #12435
---
.../apache/flink/table/client/cli/CliClient.java | 111 +++++----------------
.../apache/flink/table/client/cli/CliStrings.java | 8 +-
.../flink/table/client/cli/CliClientTest.java | 14 +++
.../flink/table/client/cli/TestingExecutor.java | 15 ++-
.../table/client/cli/TestingExecutorBuilder.java | 12 ++-
5 files changed, 68 insertions(+), 92 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index ffb2d0e..b1acc76 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -313,10 +313,10 @@ public class CliClient {
callInsert(cmdCall);
break;
case CREATE_TABLE:
- callCreateTable(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_TABLE_CREATED);
break;
case DROP_TABLE:
- callDropTable(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_TABLE_REMOVED);
break;
case CREATE_VIEW:
callCreateView(cmdCall);
@@ -325,28 +325,37 @@ public class CliClient {
callDropView(cmdCall);
break;
case CREATE_FUNCTION:
- callCreateFunction(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_FUNCTION_CREATED);
break;
case DROP_FUNCTION:
- callDropFunction(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_FUNCTION_REMOVED);
break;
case ALTER_FUNCTION:
- callAlterFunction(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED,
+
CliStrings.MESSAGE_ALTER_FUNCTION_FAILED);
break;
case SOURCE:
callSource(cmdCall);
break;
case CREATE_DATABASE:
- callCreateDatabase(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_DATABASE_CREATED);
break;
case DROP_DATABASE:
- callDropDatabase(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_DATABASE_REMOVED);
break;
case ALTER_DATABASE:
- callAlterDatabase(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_ALTER_DATABASE_SUCCEEDED,
+
CliStrings.MESSAGE_ALTER_DATABASE_FAILED);
break;
case ALTER_TABLE:
- callAlterTable(cmdCall);
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED,
+
CliStrings.MESSAGE_ALTER_TABLE_FAILED);
+ break;
+ case CREATE_CATALOG:
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_CATALOG_CREATED);
+ break;
+ case DROP_CATALOG:
+ callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_CATALOG_REMOVED);
break;
default:
throw new SqlClientException("Unsupported
command: " + cmdCall.command);
@@ -583,24 +592,6 @@ public class CliClient {
return true;
}
- private void callCreateTable(SqlCommandCall cmdCall) {
- try {
- executor.createTable(sessionId, cmdCall.operands[0]);
- printInfo(CliStrings.MESSAGE_TABLE_CREATED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
- private void callDropTable(SqlCommandCall cmdCall) {
- try {
- executor.dropTable(sessionId, cmdCall.operands[0]);
- printInfo(CliStrings.MESSAGE_TABLE_REMOVED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
private void callCreateView(SqlCommandCall cmdCall) {
final String name = cmdCall.operands[0];
final String query = cmdCall.operands[1];
@@ -641,33 +632,6 @@ public class CliClient {
}
}
- private void callCreateFunction(SqlCommandCall cmdCall) {
- try {
- executor.executeSql(sessionId, cmdCall.operands[0]);
- printInfo(CliStrings.MESSAGE_FUNCTION_CREATED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
- private void callDropFunction(SqlCommandCall cmdCall) {
- try {
- executor.executeSql(sessionId, cmdCall.operands[0]);
- printInfo(CliStrings.MESSAGE_FUNCTION_REMOVED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
- private void callAlterFunction(SqlCommandCall cmdCall) {
- try {
- executor.executeSql(sessionId, cmdCall.operands[0]);
- printInfo(CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED);
- } catch (SqlExecutionException e) {
-
printExecutionException(CliStrings.MESSAGE_ALTER_FUNCTION_FAILED, e);
- }
- }
-
private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];
@@ -697,43 +661,16 @@ public class CliClient {
call.ifPresent(this::callCommand);
}
- private void callCreateDatabase(SqlCommandCall cmdCall) {
- final String createDatabaseStmt = cmdCall.operands[0];
- try {
- executor.executeUpdate(sessionId, createDatabaseStmt);
- printInfo(CliStrings.MESSAGE_DATABASE_CREATED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
- private void callDropDatabase(SqlCommandCall cmdCall) {
- final String dropDatabaseStmt = cmdCall.operands[0];
- try {
- executor.executeUpdate(sessionId, dropDatabaseStmt);
- printInfo(CliStrings.MESSAGE_DATABASE_REMOVED);
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- }
- }
-
- private void callAlterDatabase(SqlCommandCall cmdCall) {
- final String alterDatabaseStmt = cmdCall.operands[0];
- try {
- executor.executeUpdate(sessionId, alterDatabaseStmt);
- printInfo(CliStrings.MESSAGE_DATABASE_ALTER_SUCCEEDED);
- } catch (SqlExecutionException e) {
-
printExecutionException(CliStrings.MESSAGE_DATABASE_ALTER_FAILED, e);
- }
+ private void callDdl(String ddl, String successMessage) {
+ callDdl(ddl, successMessage, null);
}
- private void callAlterTable(SqlCommandCall cmdCall) {
- final String alterTableStmt = cmdCall.operands[0];
+ private void callDdl(String ddl, String successMessage, String
errorMessage) {
try {
- executor.executeUpdate(sessionId, alterTableStmt);
- printInfo(CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED);
+ executor.executeSql(sessionId, ddl);
+ printInfo(successMessage);
} catch (SqlExecutionException e) {
-
printExecutionException(CliStrings.MESSAGE_ALTER_TABLE_FAILED, e);
+ printExecutionException(errorMessage, e);
}
}
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index fd04998..3a439e2 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -164,9 +164,13 @@ public final class CliStrings {
public static final String MESSAGE_DATABASE_REMOVED = "Database has
been removed.";
- public static final String MESSAGE_DATABASE_ALTER_SUCCEEDED = "Alter
database succeeded!";
+ public static final String MESSAGE_ALTER_DATABASE_SUCCEEDED = "Alter
database succeeded!";
- public static final String MESSAGE_DATABASE_ALTER_FAILED = "Alter
database failed!";
+ public static final String MESSAGE_ALTER_DATABASE_FAILED = "Alter
database failed!";
+
+ public static final String MESSAGE_CATALOG_CREATED = "Catalog has been
created.";
+
+ public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been
removed.";
public static final String MESSAGE_VIEW_ALREADY_EXISTS = "A view with
this name has already been defined in the current CLI session.";
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 19d91ab..64e395f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -226,6 +226,20 @@ public class CliClientTest extends TestLogger {
}
}
+ @Test
+ public void testCreateCatalog() throws Exception {
+ TestingExecutor executor = new
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
+ testExecuteSql(executor, "create catalog c1
with('type'='generic_in_memory');");
+ assertThat(executor.getNumExecuteSqlCalls(), is(1));
+ }
+
+ @Test
+ public void testDropCatalog() throws Exception {
+ TestingExecutor executor = new
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
+ testExecuteSql(executor, "drop catalog c1;");
+ assertThat(executor.getNumExecuteSqlCalls(), is(1));
+ }
+
//
--------------------------------------------------------------------------------------------
/**
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index b22f8d7..179cfb9 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import java.util.List;
@@ -58,6 +59,9 @@ class TestingExecutor implements Executor {
private int numUseDatabaseCalls = 0;
private BiConsumerWithException<String, String, SqlExecutionException>
useDatabaseConsumer;
+ private int numExecuteSqlCalls = 0;
+ private BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeUpdateConsumer;
+
private final SqlParserHelper helper;
TestingExecutor(
@@ -65,12 +69,14 @@ class TestingExecutor implements Executor {
List<SupplierWithException<TypedResult<Integer>,
SqlExecutionException>> snapshotResults,
List<SupplierWithException<List<Row>,
SqlExecutionException>> resultPages,
BiConsumerWithException<String, String,
SqlExecutionException> useCatalogConsumer,
- BiConsumerWithException<String, String,
SqlExecutionException> useDatabaseConsumer) {
+ BiConsumerWithException<String, String,
SqlExecutionException> useDatabaseConsumer,
+ BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeUpdateConsumer) {
this.resultChanges = resultChanges;
this.snapshotResults = snapshotResults;
this.resultPages = resultPages;
this.useCatalogConsumer = useCatalogConsumer;
this.useDatabaseConsumer = useDatabaseConsumer;
+ this.executeUpdateConsumer = executeUpdateConsumer;
helper = new SqlParserHelper();
helper.registerTables();
}
@@ -176,7 +182,8 @@ class TestingExecutor implements Executor {
@Override
public TableResult executeSql(String sessionId, String statement)
throws SqlExecutionException {
- return null;
+ numExecuteSqlCalls++;
+ return executeUpdateConsumer.apply(sessionId, statement);
}
@Override
@@ -237,4 +244,8 @@ class TestingExecutor implements Executor {
public int getNumUseDatabaseCalls() {
return numUseDatabaseCalls;
}
+
+ public int getNumExecuteSqlCalls() {
+ return numExecuteSqlCalls;
+ }
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
index e878964..66ba97f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.client.cli;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import java.util.Arrays;
@@ -38,6 +40,7 @@ class TestingExecutorBuilder {
private List<SupplierWithException<List<Row>, SqlExecutionException>>
resultPagesSupplier = Collections.emptyList();
private BiConsumerWithException<String, String, SqlExecutionException>
setUseCatalogConsumer = (ignoredA, ignoredB) -> {};
private BiConsumerWithException<String, String, SqlExecutionException>
setUseDatabaseConsumer = (ignoredA, ignoredB) -> {};
+ private BiFunctionWithException<String, String, TableResult,
SqlExecutionException> setExecuteSqlConsumer = (ignoredA, ignoredB) -> null;
@SafeVarargs
public final TestingExecutorBuilder
setResultChangesSupplier(SupplierWithException<TypedResult<List<Tuple2<Boolean,
Row>>>, SqlExecutionException> ... resultChangesSupplier) {
@@ -67,12 +70,19 @@ class TestingExecutorBuilder {
return this;
}
+ public final TestingExecutorBuilder setExecuteSqlConsumer(
+ BiFunctionWithException<String, String, TableResult,
SqlExecutionException> setExecuteUpdateConsumer) {
+ this.setExecuteSqlConsumer = setExecuteUpdateConsumer;
+ return this;
+ }
+
public TestingExecutor build() {
return new TestingExecutor(
resultChangesSupplier,
snapshotResultsSupplier,
resultPagesSupplier,
setUseCatalogConsumer,
- setUseDatabaseConsumer);
+ setUseDatabaseConsumer,
+ setExecuteSqlConsumer);
}
}