This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 13618dd10c3 [FLINK-38087][sql-gateway] Improve OperationExecutor error
messages and Java stream usage
13618dd10c3 is described below
commit 13618dd10c38bb4a1169e91e9166abf814b977a1
Author: Mingliang Liu <[email protected]>
AuthorDate: Thu Jul 10 16:15:57 2025 -0700
[FLINK-38087][sql-gateway] Improve OperationExecutor error messages and
Java stream usage
The configureSession() error message does not include SET/RESET.
There are also some usages of Java stream which can be improved.
This closes #26781
---
.../service/operation/OperationExecutor.java | 143 +++++++++++++++------
.../gateway/service/SqlGatewayServiceITCase.java | 22 +++-
2 files changed, 116 insertions(+), 49 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 9612b57fcca..acf5ac7d772 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -82,7 +82,9 @@ import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseOperation;
+import org.apache.flink.table.operations.UseCatalogOperation;
+import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
@@ -92,11 +94,32 @@ import
org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
-import org.apache.flink.table.operations.ddl.AlterOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogCommentOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;
+import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+import org.apache.flink.table.operations.ddl.AlterModelRenameOperation;
+import org.apache.flink.table.operations.ddl.AlterTableOperation;
+import org.apache.flink.table.operations.ddl.AlterViewOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
+import org.apache.flink.table.operations.ddl.CreateModelOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.table.operations.ddl.CreateViewOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogOperation;
+import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropModelOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
+import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropViewOperation;
+import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import
org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.DateTimeUtils;
@@ -114,8 +137,10 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -142,6 +167,9 @@ public class OperationExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(OperationExecutor.class);
+ private static final Map<Class<?>, String>
SUPPORTED_CONFIG_SESSION_OPERATIONS =
+ createSupportedInitializationOperations();
+
protected final SessionContext sessionContext;
private final Configuration executionConfig;
@@ -180,28 +208,14 @@ public class OperationExecutor {
}
Operation op = parsedOperations.get(0);
- if (!(op instanceof SetOperation)
- && !(op instanceof ResetOperation)
- && !(op instanceof CreateOperation)
- && !(op instanceof DropOperation)
- && !(op instanceof UseOperation)
- && !(op instanceof AlterOperation)
- && !(op instanceof LoadModuleOperation)
- && !(op instanceof UnloadModuleOperation)
- && !(op instanceof AddJarOperation)) {
+ if (SUPPORTED_CONFIG_SESSION_OPERATIONS.keySet().stream()
+ .noneMatch(c -> c.isInstance(op))) {
throw new UnsupportedOperationException(
String.format(
- "Unsupported statement for configuring
session:%s\n"
- + "The configureSession API only supports
to execute statement of type "
- + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
- + "CREATE DATABASE, DROP DATABASE, ALTER
DATABASE, "
- + "CREATE FUNCTION, DROP FUNCTION, ALTER
FUNCTION, "
- + "CREATE CATALOG, DROP CATALOG, "
- + "USE CATALOG, USE [CATALOG.]DATABASE, "
- + "CREATE VIEW, DROP VIEW, "
- + "LOAD MODULE, UNLOAD MODULE, USE MODULE,
"
- + "ADD JAR.",
- statement));
+ "Unsupported statement for configuring session:
%s\n"
+ + "The configureSession API only supports
executing statements of type %s.",
+ statement,
+ String.join(", ",
SUPPORTED_CONFIG_SESSION_OPERATIONS.values())));
}
if (op instanceof SetOperation) {
@@ -290,8 +304,8 @@ public class OperationExecutor {
public Set<TableInfo> listTables(
String catalogName, String databaseName, Set<TableKind>
tableKinds) {
checkArgument(
- Arrays.asList(TableKind.TABLE,
TableKind.VIEW).containsAll(tableKinds),
- "Currently only support to list TABLE, VIEW or TABLE AND
VIEW.");
+ EnumSet.of(TableKind.TABLE,
TableKind.VIEW).containsAll(tableKinds),
+ "Currently only supports listing TABLE, VIEW or TABLE AND
VIEW.");
if (tableKinds.contains(TableKind.TABLE) &&
tableKinds.contains(TableKind.VIEW)) {
return listTables(catalogName, databaseName, true);
} else if (tableKinds.contains(TableKind.TABLE)) {
@@ -608,7 +622,7 @@ public class OperationExecutor {
// set a property
sessionContext.set(setOp.getKey().get().trim(),
setOp.getValue().get().trim());
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK,
false);
- } else if (!setOp.getKey().isPresent() &&
!setOp.getValue().isPresent()) {
+ } else if (setOp.getKey().isEmpty() && setOp.getValue().isEmpty()) {
// show all properties
Map<String, String> configMap =
tableEnv.getConfig().getConfiguration().toMap();
return ResultFetcher.fromResults(
@@ -743,23 +757,21 @@ public class OperationExecutor {
ObjectIdentifier.of(
catalogName,
databaseName, name),
TableKind.TABLE)));
- return Collections.unmodifiableSet(new HashSet<>(ans.values()));
+ return ans.values().stream().collect(Collectors.toUnmodifiableSet());
}
private Set<TableInfo> listViews(String catalogName, String databaseName) {
- return Collections.unmodifiableSet(
- sessionContext
- .getSessionState()
- .catalogManager
- .listViews(catalogName, databaseName)
- .stream()
- .map(
- name ->
- new TableInfo(
- ObjectIdentifier.of(
- catalogName,
databaseName, name),
- TableKind.VIEW))
- .collect(Collectors.toSet()));
+ return sessionContext
+ .getSessionState()
+ .catalogManager
+ .listViews(catalogName, databaseName)
+ .stream()
+ .map(
+ name ->
+ new TableInfo(
+ ObjectIdentifier.of(catalogName,
databaseName, name),
+ TableKind.VIEW))
+ .collect(Collectors.toUnmodifiableSet());
}
public ResultFetcher callStopJobOperation(
@@ -890,7 +902,7 @@ public class OperationExecutor {
}
});
- if (!jobStatusOp.isPresent()) {
+ if (jobStatusOp.isEmpty()) {
throw new SqlExecutionException(
String.format("Described job %s does not exist in the
cluster.", jobId));
}
@@ -945,6 +957,53 @@ public class OperationExecutor {
}
}
+ private static Map<Class<?>, String>
createSupportedInitializationOperations() {
+ // Use LinkedHashMap to preserve insertion order
+ Map<Class<?>, String> ops = new LinkedHashMap<>();
+ // Configuration operations
+ ops.put(SetOperation.class, "SET");
+ ops.put(ResetOperation.class, "RESET");
+ // CREATE operations
+ ops.put(CreateTableOperation.class, "CREATE TABLE");
+ ops.put(CreateViewOperation.class, "CREATE VIEW");
+ ops.put(CreateDatabaseOperation.class, "CREATE DATABASE");
+ ops.put(CreateCatalogFunctionOperation.class, "CREATE FUNCTION");
+ ops.put(CreateTempSystemFunctionOperation.class, "CREATE TEMPORARY
SYSTEM FUNCTION");
+ ops.put(CreateCatalogOperation.class, "CREATE CATALOG");
+ ops.put(CreateModelOperation.class, "CREATE MODEL");
+ ops.put(CreateMaterializedTableOperation.class, "CREATE MATERIALIZED
TABLE");
+ // DROP operations
+ ops.put(DropTableOperation.class, "DROP TABLE");
+ ops.put(DropViewOperation.class, "DROP VIEW");
+ ops.put(DropDatabaseOperation.class, "DROP DATABASE");
+ ops.put(DropCatalogFunctionOperation.class, "DROP FUNCTION");
+ ops.put(DropTempSystemFunctionOperation.class, "DROP TEMPORARY SYSTEM
FUNCTION");
+ ops.put(DropCatalogOperation.class, "DROP CATALOG");
+ ops.put(DropModelOperation.class, "DROP MODEL");
+ ops.put(DropMaterializedTableOperation.class, "DROP MATERIALIZED
TABLE");
+ // ALTER operations
+ ops.put(AlterTableOperation.class, "ALTER TABLE");
+ ops.put(AlterViewOperation.class, "ALTER VIEW");
+ ops.put(AlterDatabaseOperation.class, "ALTER DATABASE");
+ ops.put(AlterCatalogFunctionOperation.class, "ALTER FUNCTION");
+ ops.put(AlterCatalogOptionsOperation.class, "ALTER CATALOG SET");
+ ops.put(AlterCatalogResetOperation.class, "ALTER CATALOG RESET");
+ ops.put(AlterCatalogCommentOperation.class, "ALTER CATALOG COMMENT");
+ ops.put(AlterModelChangeOperation.class, "ALTER MODEL");
+ ops.put(AlterModelRenameOperation.class, "ALTER MODEL");
+ ops.put(AlterMaterializedTableOperation.class, "ALTER MATERIALIZED
TABLE");
+ // USE operations
+ ops.put(UseCatalogOperation.class, "USE CATALOG");
+ ops.put(UseDatabaseOperation.class, "USE [CATALOG.]DATABASE");
+ ops.put(UseModulesOperation.class, "USE MODULES");
+ // Module operations
+ ops.put(LoadModuleOperation.class, "LOAD MODULE");
+ ops.put(UnloadModuleOperation.class, "UNLOAD MODULE");
+ // AddJar operation
+ ops.put(AddJarOperation.class, "ADD JAR");
+ return Collections.unmodifiableMap(ops);
+ }
+
/**
* Internal interface to encapsulate cluster actions which are executed
via the {@link
* ClusterClient}.
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index e2536b0447b..d64ed8f35f8 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -981,13 +981,21 @@ public class SqlGatewayServiceITCase {
.satisfies(
FlinkAssertions.anyCauseMatches(
UnsupportedOperationException.class,
- "Unsupported statement for configuring
session:SELECT 1;\n"
- + "The configureSession API only
supports to execute statement of type "
- + "CREATE TABLE, DROP TABLE, ALTER
TABLE, "
- + "CREATE DATABASE, DROP DATABASE,
ALTER DATABASE, "
- + "CREATE FUNCTION, DROP FUNCTION,
ALTER FUNCTION, "
- + "CREATE CATALOG, DROP CATALOG, USE
CATALOG, USE [CATALOG.]DATABASE, "
- + "CREATE VIEW, DROP VIEW, LOAD
MODULE, UNLOAD MODULE, USE MODULE, ADD JAR."));
+ "Unsupported statement for configuring
session: SELECT 1;\n"
+ + "The configureSession API only
supports executing statements of type "
+ + "SET, RESET, "
+ + "CREATE TABLE, CREATE VIEW, CREATE
DATABASE, "
+ + "CREATE FUNCTION, CREATE TEMPORARY
SYSTEM FUNCTION, "
+ + "CREATE CATALOG, CREATE MODEL,
CREATE MATERIALIZED TABLE, "
+ + "DROP TABLE, DROP VIEW, DROP
DATABASE, DROP FUNCTION, "
+ + "DROP TEMPORARY SYSTEM FUNCTION,
DROP CATALOG, "
+ + "DROP MODEL, DROP MATERIALIZED
TABLE, "
+ + "ALTER TABLE, ALTER VIEW, ALTER
DATABASE, ALTER FUNCTION, "
+ + "ALTER CATALOG SET, ALTER CATALOG
RESET, ALTER CATALOG COMMENT, "
+ + "ALTER MODEL, ALTER MODEL, ALTER
MATERIALIZED TABLE, "
+ + "USE CATALOG, USE
[CATALOG.]DATABASE, USE MODULES, "
+ + "LOAD MODULE, UNLOAD MODULE, "
+ + "ADD JAR"));
}
@Test