This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 4a5a720 [FLINK-15073][sql client] Sql client falis to run same query
multiple times
4a5a720 is described below
commit 4a5a720024992c12bbfd4fb316d04f24d23a109e
Author: yuzhao.cyz <[email protected]>
AuthorDate: Mon Dec 9 14:46:44 2019 +0800
[FLINK-15073][sql client] Sql client falis to run same query multiple times
After we change the SQL-CLI to stateful in FLINK-14672, each query's
temporal table was left out so we can not re-registered the same
object(from the same query).
This closes #10523
(cherry picked from commit 7bf96cf5fbd76377f5054c3b3f6552615a94c11d)
---
.../table/client/gateway/local/LocalExecutor.java | 12 +-
.../client/gateway/local/LocalExecutorITCase.java | 121 +++++++++++++++++++++
2 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index c9f50d0..1174b09 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -619,16 +619,16 @@ public class LocalExecutor implements Executor {
removeTimeAttributes(table.getSchema()),
context.getExecutionConfig(),
context.getClassLoader());
-
final String jobName = sessionId + ": " + query;
+ final String tableName = String.format("_tmp_table_%s",
Math.abs(query.hashCode()));
final Pipeline pipeline;
try {
// writing to a sink requires an optimization step that
might reference UDFs during code compilation
context.wrapClassLoader(() -> {
-
context.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
+
context.getTableEnvironment().registerTableSink(tableName,
result.getTableSink());
table.insertInto(
context.getQueryConfig(),
- jobName);
+ tableName);
return null;
});
pipeline = context.createPipeline(jobName,
context.getFlinkConfig());
@@ -638,6 +638,12 @@ public class LocalExecutor implements Executor {
result.close();
// catch everything such that the query does not crash
the executor
throw new SqlExecutionException("Invalid SQL query.",
t);
+ } finally {
+ // Remove the temporal table object.
+ context.wrapClassLoader(() -> {
+
context.getTableEnvironment().dropTemporaryTable(tableName);
+ return null;
+ });
}
// store the result with a unique id
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c7a19bf..10c797f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -450,6 +450,50 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test(timeout = 90_000L)
+ public void testStreamQueryExecutionChangelogMultipleTimes() throws
Exception {
+ final URL url =
getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", planner);
+ replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "changelog");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+
+ final Executor executor = createModifiedExecutor(clusterClient,
replaceVars);
+ final SessionContext session = new
SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("(true,47,Hello World)");
+ expectedResults.add("(true,27,Hello World)");
+ expectedResults.add("(true,37,Hello World)");
+ expectedResults.add("(true,37,Hello World)");
+ expectedResults.add("(true,47,Hello World)");
+ expectedResults.add("(true,57,Hello World!!!!)");
+
+ try {
+ for (int i = 0; i < 3; i++) {
+ // start job and retrieval
+ final ResultDescriptor desc =
executor.executeQuery(
+ sessionId,
+ "SELECT
scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
+
+ assertFalse(desc.isMaterialized());
+
+ final List<String> actualResults =
+
retrieveChangelogResult(executor, sessionId, desc.getResultId());
+
+
TestBaseUtils.compareResultCollections(expectedResults, actualResults,
Comparator.naturalOrder());
+ }
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
@Test(timeout = 30_000L)
public void testStreamQueryExecutionTable() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
@@ -476,6 +520,43 @@ public class LocalExecutorITCase extends TestLogger {
executeStreamQueryTable(replaceVars, query, expectedResults);
}
+ @Test(timeout = 90_000L)
+ public void testStreamQueryExecutionTableMultipleTimes() throws
Exception {
+ final URL url =
getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", planner);
+ replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "table");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+
+ final String query = "SELECT scalarUDF(IntegerField1),
StringField1 FROM TableNumber1";
+
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("47,Hello World");
+ expectedResults.add("27,Hello World");
+ expectedResults.add("37,Hello World");
+ expectedResults.add("37,Hello World");
+ expectedResults.add("47,Hello World");
+ expectedResults.add("57,Hello World!!!!");
+
+ final Executor executor = createModifiedExecutor(clusterClient,
replaceVars);
+ final SessionContext session = new
SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ try {
+ for (int i = 0; i < 3; i++) {
+ executeStreamQueryTable(replaceVars, query,
expectedResults);
+ }
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
@Test(timeout = 30_000L)
public void testStreamQueryExecutionLimitedTable() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
@@ -535,6 +616,46 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test(timeout = 90_000L)
+ public void testBatchQueryExecutionMultipleTimes() throws Exception {
+ final URL url =
getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", planner);
+ replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+ replaceVars.put("$VAR_RESULT_MODE", "table");
+ replaceVars.put("$VAR_UPDATE_MODE", "");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+
+ final Executor executor = createModifiedExecutor(clusterClient,
replaceVars);
+ final SessionContext session = new
SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ assertEquals("test-session", sessionId);
+
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("47");
+ expectedResults.add("27");
+ expectedResults.add("37");
+ expectedResults.add("37");
+ expectedResults.add("47");
+ expectedResults.add("57");
+
+ try {
+ for (int i = 0; i < 3; i++) {
+ final ResultDescriptor desc =
executor.executeQuery(sessionId, "SELECT * FROM TestView1");
+
+ assertTrue(desc.isMaterialized());
+
+ final List<String> actualResults =
retrieveTableResult(executor, sessionId, desc.getResultId());
+
+
TestBaseUtils.compareResultCollections(expectedResults, actualResults,
Comparator.naturalOrder());
+ }
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
@Test(timeout = 30_000L)
public void ensureExceptionOnFaultySourceInStreamingChangelogMode()
throws Exception {
final String missingFileName = "missing-source";