This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 6de64ac [FLINK-16217][sql-client] Fix exception catching to avoid SQL
client crashes
6de64ac is described below
commit 6de64ac202d1f0ac97df567208764e1547540c6e
Author: godfreyhe <[email protected]>
AuthorDate: Fri Mar 13 13:38:03 2020 +0800
[FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
This closes #11397.
---
.../org/apache/flink/table/client/cli/CliClient.java | 14 ++++++++++++--
.../table/client/gateway/local/LocalExecutor.java | 9 ++++++---
.../apache/flink/table/client/cli/CliClientTest.java | 18 +++---------------
.../flink/table/client/cli/utils/TerminalUtils.java | 5 ++++-
4 files changed, 25 insertions(+), 21 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 797bca6..a0ec166 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -340,7 +340,12 @@ public class CliClient {
}
private void callReset() {
- executor.resetSessionProperties(sessionId);
+ try {
+ executor.resetSessionProperties(sessionId);
+ } catch (SqlExecutionException e) {
+ printExecutionException(e);
+ return;
+ }
printInfo(CliStrings.MESSAGE_RESET);
}
@@ -367,7 +372,12 @@ public class CliClient {
}
// set a property
else {
- executor.setSessionProperty(sessionId,
cmdCall.operands[0], cmdCall.operands[1]);
+ try {
+ executor.setSessionProperty(sessionId,
cmdCall.operands[0], cmdCall.operands[1].trim());
+ } catch (SqlExecutionException e) {
+ printExecutionException(e);
+ return;
+ }
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
}
terminal.flush();
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3665735..b59c6bf 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -63,8 +63,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.JarUtils;
import org.apache.flink.util.StringUtils;
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
-
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -282,7 +280,12 @@ public class LocalExecutor implements Executor {
public void setSessionProperty(String sessionId, String key, String
value) throws SqlExecutionException {
ExecutionContext<?> context = getExecutionContext(sessionId);
Environment env = context.getEnvironment();
- Environment newEnv = Environment.enrich(env,
ImmutableMap.of(key, value), ImmutableMap.of());
+ Environment newEnv;
+ try {
+ newEnv = Environment.enrich(env,
Collections.singletonMap(key, value), Collections.emptyMap());
+ } catch (Throwable t) {
+ throw new SqlExecutionException("Could not set session
property.", t);
+ }
// Renew the ExecutionContext by new environment.
// Book keep all the session states of current ExecutionContext
then
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 3016724..aa2c3a1 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.cli;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.cli.utils.TerminalUtils;
+import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
@@ -44,7 +45,6 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -103,17 +103,11 @@ public class CliClientTest extends TestLogger {
Executor executor = mock(Executor.class);
doThrow(new SqlExecutionException("mocked
exception")).when(executor).useDatabase(any(), any());
InputStream inputStream = new ByteArrayInputStream("use
db;\n".getBytes());
- // don't care about the output
- OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- }
- };
SessionContext session = new SessionContext("test-session", new
Environment());
String sessionId = executor.openSession(session);
CliClient cliClient = null;
- try (Terminal terminal = new DumbTerminal(inputStream,
outputStream)) {
+ try (Terminal terminal = new DumbTerminal(inputStream, new
MockOutputStream())) {
cliClient = new CliClient(terminal, sessionId,
executor);
cliClient.open();
verify(executor).useDatabase(any(), any());
@@ -129,17 +123,11 @@ public class CliClientTest extends TestLogger {
Executor executor = mock(Executor.class);
doThrow(new SqlExecutionException("mocked
exception")).when(executor).useCatalog(any(), any());
InputStream inputStream = new ByteArrayInputStream("use catalog
cat;\n".getBytes());
- // don't care about the output
- OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- }
- };
CliClient cliClient = null;
SessionContext sessionContext = new
SessionContext("test-session", new Environment());
String sessionId = executor.openSession(sessionContext);
- try (Terminal terminal = new DumbTerminal(inputStream,
outputStream)) {
+ try (Terminal terminal = new DumbTerminal(inputStream, new
MockOutputStream())) {
cliClient = new CliClient(terminal, sessionId,
executor);
cliClient.open();
verify(executor).useCatalog(any(), any());
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
index 772fb01..d2b851d 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
@@ -50,7 +50,10 @@ public class TerminalUtils {
}
}
- private static class MockOutputStream extends OutputStream {
+ /**
+ * A mock {@link OutputStream} for testing.
+ */
+ public static class MockOutputStream extends OutputStream {
@Override
public void write(int b) {