This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new cffbbf4 [FLINK-16217][sql-client] Fix exception catching to avoid SQL
client crashes
cffbbf4 is described below
commit cffbbf4cb2c0f9dbd78642870c71391bc9afdb2f
Author: godfreyhe <[email protected]>
AuthorDate: Fri Mar 13 13:38:03 2020 +0800
[FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
This closes #11397.
---
.../apache/flink/table/client/cli/CliClient.java | 14 +++++-
.../table/client/gateway/local/LocalExecutor.java | 9 ++--
.../flink/table/client/cli/CliClientTest.java | 51 ++++++++++++----------
.../flink/table/client/cli/TestingExecutor.java | 34 ++++++++++++---
.../table/client/cli/TestingExecutorBuilder.java | 18 +++++++-
.../table/client/cli/utils/TerminalUtils.java | 5 ++-
6 files changed, 96 insertions(+), 35 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 93b02c5..b93098b 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -375,7 +375,12 @@ public class CliClient {
}
private void callReset() {
- executor.resetSessionProperties(sessionId);
+ try {
+ executor.resetSessionProperties(sessionId);
+ } catch (SqlExecutionException e) {
+ printExecutionException(e);
+ return;
+ }
printInfo(CliStrings.MESSAGE_RESET);
}
@@ -402,7 +407,12 @@ public class CliClient {
}
// set a property
else {
- executor.setSessionProperty(sessionId,
cmdCall.operands[0], cmdCall.operands[1].trim());
+ try {
+ executor.setSessionProperty(sessionId,
cmdCall.operands[0], cmdCall.operands[1].trim());
+ } catch (SqlExecutionException e) {
+ printExecutionException(e);
+ return;
+ }
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
}
terminal.flush();
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5caa2a9..665e85d 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -66,8 +66,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.JarUtils;
import org.apache.flink.util.StringUtils;
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
-
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,7 +286,12 @@ public class LocalExecutor implements Executor {
public void setSessionProperty(String sessionId, String key, String
value) throws SqlExecutionException {
ExecutionContext<?> context = getExecutionContext(sessionId);
Environment env = context.getEnvironment();
- Environment newEnv = Environment.enrich(env,
ImmutableMap.of(key, value), ImmutableMap.of());
+ Environment newEnv;
+ try {
+ newEnv = Environment.enrich(env,
Collections.singletonMap(key, value), Collections.emptyMap());
+ } catch (Throwable t) {
+ throw new SqlExecutionException("Could not set session
property.", t);
+ }
// Renew the ExecutionContext by new environment.
// Book keep all the session states of current ExecutionContext
then
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 351852b..38f554f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.cli.utils.SqlParserHelper;
import org.apache.flink.table.client.cli.utils.TerminalUtils;
+import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
@@ -49,7 +50,6 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -115,17 +115,11 @@ public class CliClientTest extends TestLogger {
})
.build();
InputStream inputStream = new ByteArrayInputStream("use
db;\n".getBytes());
- // don't care about the output
- OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- }
- };
SessionContext session = new SessionContext("test-session", new
Environment());
String sessionId = executor.openSession(session);
CliClient cliClient = null;
- try (Terminal terminal = new DumbTerminal(inputStream,
outputStream)) {
+ try (Terminal terminal = new DumbTerminal(inputStream, new
MockOutputStream())) {
cliClient = new CliClient(terminal, sessionId,
executor, File.createTempFile("history", "tmp").toPath());
cliClient.open();
@@ -146,17 +140,11 @@ public class CliClientTest extends TestLogger {
.build();
InputStream inputStream = new ByteArrayInputStream("use catalog
cat;\n".getBytes());
- // don't care about the output
- OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- }
- };
CliClient cliClient = null;
SessionContext sessionContext = new
SessionContext("test-session", new Environment());
String sessionId = executor.openSession(sessionContext);
- try (Terminal terminal = new DumbTerminal(inputStream,
outputStream)) {
+ try (Terminal terminal = new DumbTerminal(inputStream, new
MockOutputStream())) {
cliClient = new CliClient(terminal, sessionId,
executor, File.createTempFile("history", "tmp").toPath());
cliClient.open();
assertThat(executor.getNumUseCatalogCalls(), is(1));
@@ -226,15 +214,8 @@ public class CliClientTest extends TestLogger {
String sessionId = mockExecutor.openSession(context);
InputStream inputStream = new ByteArrayInputStream("help;\nuse
catalog cat;\n".getBytes());
- // don't care about the output
- OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- }
- };
-
CliClient cliClient = null;
- try (Terminal terminal = new DumbTerminal(inputStream,
outputStream)) {
+ try (Terminal terminal = new DumbTerminal(inputStream, new
MockOutputStream())) {
Path historyFilePath = File.createTempFile("history",
"tmp").toPath();
cliClient = new CliClient(terminal, sessionId,
mockExecutor, historyFilePath);
cliClient.open();
@@ -250,6 +231,30 @@ public class CliClientTest extends TestLogger {
}
@Test
+ public void testSetSessionPropertyWithException() throws Exception {
+ TestingExecutor executor = new TestingExecutorBuilder()
+ .setSessionPropertiesFunction((ignored1,
ignored2, ignored3) -> {
+ throw new
SqlExecutionException("Property 'parallelism' must be a integer value but was:
10a");
+ })
+ .build();
+ String output = testExecuteSql(executor, "set
execution.parallelism = 10a;");
+ assertThat(executor.getNumSetSessionPropertyCalls(), is(1));
+ assertTrue(output.contains("Property 'parallelism' must be a
integer value but was: 10a"));
+ }
+
+ @Test
+ public void testResetSessionPropertiesWithException() throws Exception {
+ TestingExecutor executor = new TestingExecutorBuilder()
+ .resetSessionPropertiesFunction((ignored1) -> {
+ throw new SqlExecutionException("Failed
to reset.");
+ })
+ .build();
+ String output = testExecuteSql(executor, "reset;");
+ assertThat(executor.getNumResetSessionPropertiesCalls(), is(1));
+ assertTrue(output.contains("Failed to reset."));
+ }
+
+ @Test
public void testCreateCatalog() throws Exception {
TestingExecutor executor = new
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
testExecuteSql(executor, "create catalog c1
with('type'='generic_in_memory');");
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index 179cfb9..2635d4f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -32,7 +32,9 @@ import org.apache.flink.table.delegation.Parser;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.TriFunctionWithException;
import java.util.List;
import java.util.Map;
@@ -57,10 +59,16 @@ class TestingExecutor implements Executor {
private final BiConsumerWithException<String, String,
SqlExecutionException> useCatalogConsumer;
private int numUseDatabaseCalls = 0;
- private BiConsumerWithException<String, String, SqlExecutionException>
useDatabaseConsumer;
+ private final BiConsumerWithException<String, String,
SqlExecutionException> useDatabaseConsumer;
private int numExecuteSqlCalls = 0;
- private BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeUpdateConsumer;
+ private final BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeSqlConsumer;
+
+ private int numSetSessionPropertyCalls = 0;
+ private final TriFunctionWithException<String, String, String, Void,
SqlExecutionException> setSessionPropertyFunction;
+
+ private int numResetSessionPropertiesCalls = 0;
+ private final FunctionWithException<String, Void,
SqlExecutionException> resetSessionPropertiesFunction;
private final SqlParserHelper helper;
@@ -70,13 +78,17 @@ class TestingExecutor implements Executor {
List<SupplierWithException<List<Row>,
SqlExecutionException>> resultPages,
BiConsumerWithException<String, String,
SqlExecutionException> useCatalogConsumer,
BiConsumerWithException<String, String,
SqlExecutionException> useDatabaseConsumer,
- BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeUpdateConsumer) {
+ BiFunctionWithException<String, String, TableResult,
SqlExecutionException> executeSqlConsumer,
+ TriFunctionWithException<String, String, String, Void,
SqlExecutionException> setSessionPropertyFunction,
+ FunctionWithException<String, Void,
SqlExecutionException> resetSessionPropertiesFunction) {
this.resultChanges = resultChanges;
this.snapshotResults = snapshotResults;
this.resultPages = resultPages;
this.useCatalogConsumer = useCatalogConsumer;
this.useDatabaseConsumer = useDatabaseConsumer;
- this.executeUpdateConsumer = executeUpdateConsumer;
+ this.executeSqlConsumer = executeSqlConsumer;
+ this.setSessionPropertyFunction = setSessionPropertyFunction;
+ this.resetSessionPropertiesFunction =
resetSessionPropertiesFunction;
helper = new SqlParserHelper();
helper.registerTables();
}
@@ -133,10 +145,14 @@ class TestingExecutor implements Executor {
@Override
public void resetSessionProperties(String sessionId) throws
SqlExecutionException {
+ numResetSessionPropertiesCalls++;
+ resetSessionPropertiesFunction.apply(sessionId);
}
@Override
public void setSessionProperty(String sessionId, String key, String
value) throws SqlExecutionException {
+ numSetSessionPropertyCalls++;
+ setSessionPropertyFunction.apply(sessionId, key, value);
}
@Override
@@ -183,7 +199,7 @@ class TestingExecutor implements Executor {
@Override
public TableResult executeSql(String sessionId, String statement)
throws SqlExecutionException {
numExecuteSqlCalls++;
- return executeUpdateConsumer.apply(sessionId, statement);
+ return executeSqlConsumer.apply(sessionId, statement);
}
@Override
@@ -248,4 +264,12 @@ class TestingExecutor implements Executor {
public int getNumExecuteSqlCalls() {
return numExecuteSqlCalls;
}
+
+ public int getNumSetSessionPropertyCalls() {
+ return numSetSessionPropertyCalls;
+ }
+
+ public int getNumResetSessionPropertiesCalls() {
+ return numResetSessionPropertiesCalls;
+ }
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
index 66ba97f..979df81 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.TriFunctionWithException;
import java.util.Arrays;
import java.util.Collections;
@@ -41,6 +43,8 @@ class TestingExecutorBuilder {
private BiConsumerWithException<String, String, SqlExecutionException>
setUseCatalogConsumer = (ignoredA, ignoredB) -> {};
private BiConsumerWithException<String, String, SqlExecutionException>
setUseDatabaseConsumer = (ignoredA, ignoredB) -> {};
private BiFunctionWithException<String, String, TableResult,
SqlExecutionException> setExecuteSqlConsumer = (ignoredA, ignoredB) -> null;
+ private TriFunctionWithException<String, String, String, Void,
SqlExecutionException> setSessionPropertyFunction = (ignoredA, ignoredB,
ignoredC) -> null;
+ private FunctionWithException<String, Void, SqlExecutionException>
resetSessionPropertiesFunction = (ignoredA) -> null;
@SafeVarargs
public final TestingExecutorBuilder
setResultChangesSupplier(SupplierWithException<TypedResult<List<Tuple2<Boolean,
Row>>>, SqlExecutionException> ... resultChangesSupplier) {
@@ -76,6 +80,16 @@ class TestingExecutorBuilder {
return this;
}
+ public final TestingExecutorBuilder
setSessionPropertiesFunction(TriFunctionWithException<String, String, String,
Void, SqlExecutionException> setSessionPropertyFunction) {
+ this.setSessionPropertyFunction = setSessionPropertyFunction;
+ return this;
+ }
+
+ public final TestingExecutorBuilder
resetSessionPropertiesFunction(FunctionWithException<String, Void,
SqlExecutionException> resetSessionPropertiesFunction) {
+ this.resetSessionPropertiesFunction =
resetSessionPropertiesFunction;
+ return this;
+ }
+
public TestingExecutor build() {
return new TestingExecutor(
resultChangesSupplier,
@@ -83,6 +97,8 @@ class TestingExecutorBuilder {
resultPagesSupplier,
setUseCatalogConsumer,
setUseDatabaseConsumer,
- setExecuteSqlConsumer);
+ setExecuteSqlConsumer,
+ setSessionPropertyFunction,
+ resetSessionPropertiesFunction);
}
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
index 0ee65cc..e4b9853 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
@@ -54,7 +54,10 @@ public class TerminalUtils {
}
}
- private static class MockOutputStream extends OutputStream {
+ /**
+ * A mock {@link OutputStream} for testing.
+ */
+ public static class MockOutputStream extends OutputStream {
@Override
public void write(int b) {