This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 06688f345f6 [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc
driver
06688f345f6 is described below
commit 06688f345f6793a8964ec00002175f44cda13c33
Author: Shammon FY <[email protected]>
AuthorDate: Sat May 6 09:51:44 2023 +0800
[FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver
Close apache/flink#22533
---
.../client/cli/parser/SqlClientSyntaxHighlighter.java | 8 ++++----
.../apache/flink/table/client/gateway/Executor.java | 8 ++++++++
.../flink/table/client/gateway/ExecutorImpl.java | 11 ++++++++++-
.../apache/flink/table/client/cli/CliClientTest.java | 9 ++++++++-
.../table/gateway/service/context/DefaultContext.java | 5 +++++
flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 9 ---------
.../org/apache/flink/table/jdbc/FlinkConnection.java | 19 ++++++-------------
7 files changed, 41 insertions(+), 28 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
index bdcfa5a707d..366cae3f57e 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.client.cli.parser;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -57,16 +58,15 @@ public class SqlClientSyntaxHighlighter extends
DefaultHighlighter {
@Override
public AttributedString highlight(LineReader reader, String buffer) {
+ ReadableConfig configuration = executor.getSessionConfig();
final SyntaxHighlightStyle.BuiltInStyle style =
SyntaxHighlightStyle.BuiltInStyle.fromString(
- executor.getSessionConfig()
-
.get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA));
+
configuration.get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA));
if (style == SyntaxHighlightStyle.BuiltInStyle.DEFAULT) {
return super.highlight(reader, buffer);
}
- final String dialectName =
-
executor.getSessionConfig().get(TableConfigOptions.TABLE_SQL_DIALECT);
+ final String dialectName =
configuration.get(TableConfigOptions.TABLE_SQL_DIALECT);
final SqlDialect dialect =
SqlDialect.HIVE.name().equalsIgnoreCase(dialectName)
? SqlDialect.HIVE
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 8414789c993..b636d326560 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
+import java.util.Map;
/** A gateway for communicating with Flink and other external systems. */
public interface Executor extends Closeable {
@@ -53,6 +54,13 @@ public interface Executor extends Closeable {
*/
ReadableConfig getSessionConfig();
+ /**
+ * Get the map configuration of the session.
+ *
+ * @return the map session configuration.
+ */
+ Map<String, String> getSessionConfigMap();
+
/**
* Execute statement.
*
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index fcdee3c779d..4ec7eb3e8eb 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -83,6 +83,7 @@ import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -206,6 +207,14 @@ public class ExecutorImpl implements Executor {
}
public ReadableConfig getSessionConfig() {
+ try {
+ return Configuration.fromMap(getSessionConfigMap());
+ } catch (Exception e) {
+ throw new SqlExecutionException("Failed to get the get session
config.", e);
+ }
+ }
+
+ public Map<String, String> getSessionConfigMap() {
try {
GetSessionConfigResponseBody response =
getResponse(
@@ -213,7 +222,7 @@ public class ExecutorImpl implements Executor {
GetSessionConfigHeaders.getInstance(),
new
SessionMessageParameters(sessionHandle),
EmptyRequestBody.getInstance()));
- return Configuration.fromMap(response.getProperties());
+ return response.getProperties();
} catch (Exception e) {
throw new SqlExecutionException("Failed to get the get session
config.", e);
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 9ce1fc1c808..bbfdc64b094 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.client.cli;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
@@ -406,10 +408,15 @@ class CliClientTest {
public void configureSession(String statement) {}
@Override
- public Configuration getSessionConfig() {
+ public ReadableConfig getSessionConfig() {
return configuration;
}
+ @Override
+ public Map<String, String> getSessionConfigMap() {
+ return configuration.toMap();
+ }
+
@Override
public StatementResult executeStatement(String statement) {
receivedStatement = statement;
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index a48699d737f..b914ad4e7f1 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -42,6 +42,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -52,6 +53,10 @@ public class DefaultContext {
private final Configuration flinkConfig;
private final List<URL> dependencies;
+ public DefaultContext(Map<String, String> flinkConfig, List<URL>
dependencies) {
+ this(Configuration.fromMap(flinkConfig), dependencies);
+ }
+
public DefaultContext(Configuration flinkConfig, List<URL> dependencies) {
this.flinkConfig = flinkConfig;
this.dependencies = dependencies;
diff --git a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
index 7bdc62b30f0..cafbe51032d 100644
--- a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
@@ -83,14 +83,6 @@
<build>
<plugins>
- <plugin>
- <groupId>io.github.zentol.japicmp</groupId>
- <artifactId>japicmp-maven-plugin</artifactId>
- <configuration>
- <!-- TODO this should be removed after
get rid of flink core in issue
https://issues.apache.org/jira/browse/FLINK-31687. -->
- <skip>true</skip>
- </configuration>
- </plugin>
<!-- Build flink-sql-gateway jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -112,7 +104,6 @@
<include>org.apache.flink:flink-sql-gateway</include>
<include>org.apache.flink:flink-table-common</include>
<include>org.apache.flink:flink-annotations</include>
-
<include>org.apache.flink:flink-core</include>
</includes>
</artifactSet>
</configuration>
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 12203f94541..8c0f1a237c6 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.jdbc;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.StatementResult;
import org.apache.flink.table.gateway.service.context.DefaultContext;
@@ -35,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -53,13 +53,10 @@ public class FlinkConnection extends BaseConnection {
public FlinkConnection(DriverUri driverUri) {
this.url = driverUri.getURL();
this.statements = new ArrayList<>();
- // TODO Support default context from map to get gid of flink core for
jdbc driver in
- // https://issues.apache.org/jira/browse/FLINK-31687.
this.executor =
Executor.create(
new DefaultContext(
- Configuration.fromMap(
-
DriverUtils.fromProperties(driverUri.getProperties())),
+
DriverUtils.fromProperties(driverUri.getProperties()),
Collections.emptyList()),
driverUri.getAddress(),
UUID.randomUUID().toString());
@@ -165,20 +162,16 @@ public class FlinkConnection extends BaseConnection {
@Override
public String getClientInfo(String name) throws SQLException {
ensureOpen();
- // TODO Executor should return Map<String, String> here to get rid of
flink core for jdbc
- // driver in https://issues.apache.org/jira/browse/FLINK-31687.
- Configuration configuration = (Configuration)
executor.getSessionConfig();
- return configuration.toMap().get(name);
+ Map<String, String> configuration = executor.getSessionConfigMap();
+ return configuration.get(name);
}
@Override
public Properties getClientInfo() throws SQLException {
ensureOpen();
Properties properties = new Properties();
- // TODO Executor should return Map<String, String> here to get rid of
flink core for jdbc
- // driver in https://issues.apache.org/jira/browse/FLINK-31687.
- Configuration configuration = (Configuration)
executor.getSessionConfig();
- configuration.toMap().forEach(properties::setProperty);
+ Map<String, String> configuration = executor.getSessionConfigMap();
+ configuration.forEach(properties::setProperty);
return properties;
}