This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new cffbbf4  [FLINK-16217][sql-client] Fix exception catching to avoid SQL 
client crashes
cffbbf4 is described below

commit cffbbf4cb2c0f9dbd78642870c71391bc9afdb2f
Author: godfreyhe <[email protected]>
AuthorDate: Fri Mar 13 13:38:03 2020 +0800

    [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
    
    This closes #11397.
---
 .../apache/flink/table/client/cli/CliClient.java   | 14 +++++-
 .../table/client/gateway/local/LocalExecutor.java  |  9 ++--
 .../flink/table/client/cli/CliClientTest.java      | 51 ++++++++++++----------
 .../flink/table/client/cli/TestingExecutor.java    | 34 ++++++++++++---
 .../table/client/cli/TestingExecutorBuilder.java   | 18 +++++++-
 .../table/client/cli/utils/TerminalUtils.java      |  5 ++-
 6 files changed, 96 insertions(+), 35 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 93b02c5..b93098b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -375,7 +375,12 @@ public class CliClient {
        }
 
        private void callReset() {
-               executor.resetSessionProperties(sessionId);
+               try {
+                       executor.resetSessionProperties(sessionId);
+               } catch (SqlExecutionException e) {
+                       printExecutionException(e);
+                       return;
+               }
                printInfo(CliStrings.MESSAGE_RESET);
        }
 
@@ -402,7 +407,12 @@ public class CliClient {
                }
                // set a property
                else {
-                       executor.setSessionProperty(sessionId, 
cmdCall.operands[0], cmdCall.operands[1].trim());
+                       try {
+                               executor.setSessionProperty(sessionId, 
cmdCall.operands[0], cmdCall.operands[1].trim());
+                       } catch (SqlExecutionException e) {
+                               printExecutionException(e);
+                               return;
+                       }
                        
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
                }
                terminal.flush();
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5caa2a9..665e85d 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -66,8 +66,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.JarUtils;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
-
 import org.apache.commons.cli.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -288,7 +286,12 @@ public class LocalExecutor implements Executor {
        public void setSessionProperty(String sessionId, String key, String 
value) throws SqlExecutionException {
                ExecutionContext<?> context = getExecutionContext(sessionId);
                Environment env = context.getEnvironment();
-               Environment newEnv = Environment.enrich(env, 
ImmutableMap.of(key, value), ImmutableMap.of());
+               Environment newEnv;
+               try {
+                       newEnv = Environment.enrich(env, 
Collections.singletonMap(key, value), Collections.emptyMap());
+               } catch (Throwable t) {
+                       throw new SqlExecutionException("Could not set session 
property.", t);
+               }
 
                // Renew the ExecutionContext by new environment.
                // Book keep all the session states of current ExecutionContext 
then
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 351852b..38f554f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.cli.utils.SqlParserHelper;
 import org.apache.flink.table.client.cli.utils.TerminalUtils;
+import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
@@ -49,7 +50,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -115,17 +115,11 @@ public class CliClientTest extends TestLogger {
                        })
                        .build();
                InputStream inputStream = new ByteArrayInputStream("use 
db;\n".getBytes());
-               // don't care about the output
-               OutputStream outputStream = new OutputStream() {
-                       @Override
-                       public void write(int b) throws IOException {
-                       }
-               };
                SessionContext session = new SessionContext("test-session", new 
Environment());
                String sessionId = executor.openSession(session);
 
                CliClient cliClient = null;
-               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+               try (Terminal terminal = new DumbTerminal(inputStream, new 
MockOutputStream())) {
                        cliClient = new CliClient(terminal, sessionId, 
executor, File.createTempFile("history", "tmp").toPath());
 
                        cliClient.open();
@@ -146,17 +140,11 @@ public class CliClientTest extends TestLogger {
                        .build();
 
                InputStream inputStream = new ByteArrayInputStream("use catalog 
cat;\n".getBytes());
-               // don't care about the output
-               OutputStream outputStream = new OutputStream() {
-                       @Override
-                       public void write(int b) throws IOException {
-                       }
-               };
                CliClient cliClient = null;
                SessionContext sessionContext = new 
SessionContext("test-session", new Environment());
                String sessionId = executor.openSession(sessionContext);
 
-               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+               try (Terminal terminal = new DumbTerminal(inputStream, new 
MockOutputStream())) {
                        cliClient = new CliClient(terminal, sessionId, 
executor, File.createTempFile("history", "tmp").toPath());
                        cliClient.open();
                        assertThat(executor.getNumUseCatalogCalls(), is(1));
@@ -226,15 +214,8 @@ public class CliClientTest extends TestLogger {
                String sessionId = mockExecutor.openSession(context);
 
                InputStream inputStream = new ByteArrayInputStream("help;\nuse 
catalog cat;\n".getBytes());
-               // don't care about the output
-               OutputStream outputStream = new OutputStream() {
-                       @Override
-                       public void write(int b) throws IOException {
-                       }
-               };
-
                CliClient cliClient = null;
-               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+               try (Terminal terminal = new DumbTerminal(inputStream, new 
MockOutputStream())) {
                        Path historyFilePath = File.createTempFile("history", 
"tmp").toPath();
                        cliClient = new CliClient(terminal, sessionId, 
mockExecutor, historyFilePath);
                        cliClient.open();
@@ -250,6 +231,30 @@ public class CliClientTest extends TestLogger {
        }
 
        @Test
+       public void testSetSessionPropertyWithException() throws Exception {
+               TestingExecutor executor = new TestingExecutorBuilder()
+                               .setSessionPropertiesFunction((ignored1, 
ignored2, ignored3) -> {
+                                       throw new 
SqlExecutionException("Property 'parallelism' must be a integer value but was: 
10a");
+                               })
+                               .build();
+               String output = testExecuteSql(executor, "set 
execution.parallelism = 10a;");
+               assertThat(executor.getNumSetSessionPropertyCalls(), is(1));
+               assertTrue(output.contains("Property 'parallelism' must be a 
integer value but was: 10a"));
+       }
+
+       @Test
+       public void testResetSessionPropertiesWithException() throws Exception {
+               TestingExecutor executor = new TestingExecutorBuilder()
+                               .resetSessionPropertiesFunction((ignored1) -> {
+                                       throw new SqlExecutionException("Failed 
to reset.");
+                               })
+                               .build();
+               String output = testExecuteSql(executor, "reset;");
+               assertThat(executor.getNumResetSessionPropertiesCalls(), is(1));
+               assertTrue(output.contains("Failed to reset."));
+       }
+
+       @Test
        public void testCreateCatalog() throws Exception {
                TestingExecutor executor = new 
TestingExecutorBuilder().setExecuteSqlConsumer((s, s2) -> null).build();
                testExecuteSql(executor, "create catalog c1 
with('type'='generic_in_memory');");
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index 179cfb9..2635d4f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -32,7 +32,9 @@ import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.TriFunctionWithException;
 
 import java.util.List;
 import java.util.Map;
@@ -57,10 +59,16 @@ class TestingExecutor implements Executor {
        private final BiConsumerWithException<String, String, 
SqlExecutionException> useCatalogConsumer;
 
        private int numUseDatabaseCalls = 0;
-       private BiConsumerWithException<String, String, SqlExecutionException> 
useDatabaseConsumer;
+       private final BiConsumerWithException<String, String, 
SqlExecutionException> useDatabaseConsumer;
 
        private int numExecuteSqlCalls = 0;
-       private BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeUpdateConsumer;
+       private final BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeSqlConsumer;
+
+       private int numSetSessionPropertyCalls = 0;
+       private final TriFunctionWithException<String, String, String, Void, 
SqlExecutionException> setSessionPropertyFunction;
+
+       private int numResetSessionPropertiesCalls = 0;
+       private final FunctionWithException<String, Void, 
SqlExecutionException> resetSessionPropertiesFunction;
 
        private final SqlParserHelper helper;
 
@@ -70,13 +78,17 @@ class TestingExecutor implements Executor {
                        List<SupplierWithException<List<Row>, 
SqlExecutionException>> resultPages,
                        BiConsumerWithException<String, String, 
SqlExecutionException> useCatalogConsumer,
                        BiConsumerWithException<String, String, 
SqlExecutionException> useDatabaseConsumer,
-                       BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeUpdateConsumer) {
+                       BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> executeSqlConsumer,
+                       TriFunctionWithException<String, String, String, Void, 
SqlExecutionException> setSessionPropertyFunction,
+                       FunctionWithException<String, Void, 
SqlExecutionException> resetSessionPropertiesFunction) {
                this.resultChanges = resultChanges;
                this.snapshotResults = snapshotResults;
                this.resultPages = resultPages;
                this.useCatalogConsumer = useCatalogConsumer;
                this.useDatabaseConsumer = useDatabaseConsumer;
-               this.executeUpdateConsumer = executeUpdateConsumer;
+               this.executeSqlConsumer = executeSqlConsumer;
+               this.setSessionPropertyFunction = setSessionPropertyFunction;
+               this.resetSessionPropertiesFunction = 
resetSessionPropertiesFunction;
                helper = new SqlParserHelper();
                helper.registerTables();
        }
@@ -133,10 +145,14 @@ class TestingExecutor implements Executor {
 
        @Override
        public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
+               numResetSessionPropertiesCalls++;
+               resetSessionPropertiesFunction.apply(sessionId);
        }
 
        @Override
        public void setSessionProperty(String sessionId, String key, String 
value) throws SqlExecutionException {
+               numSetSessionPropertyCalls++;
+               setSessionPropertyFunction.apply(sessionId, key, value);
        }
 
        @Override
@@ -183,7 +199,7 @@ class TestingExecutor implements Executor {
        @Override
        public TableResult executeSql(String sessionId, String statement) 
throws SqlExecutionException {
                numExecuteSqlCalls++;
-               return executeUpdateConsumer.apply(sessionId, statement);
+               return executeSqlConsumer.apply(sessionId, statement);
        }
 
        @Override
@@ -248,4 +264,12 @@ class TestingExecutor implements Executor {
        public int getNumExecuteSqlCalls() {
                return numExecuteSqlCalls;
        }
+
+       public int getNumSetSessionPropertyCalls() {
+               return numSetSessionPropertyCalls;
+       }
+
+       public int getNumResetSessionPropertiesCalls() {
+               return numResetSessionPropertiesCalls;
+       }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
index 66ba97f..979df81 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.TriFunctionWithException;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,6 +43,8 @@ class TestingExecutorBuilder {
        private BiConsumerWithException<String, String, SqlExecutionException> 
setUseCatalogConsumer = (ignoredA, ignoredB) -> {};
        private BiConsumerWithException<String, String, SqlExecutionException> 
setUseDatabaseConsumer = (ignoredA, ignoredB) -> {};
        private BiFunctionWithException<String, String, TableResult, 
SqlExecutionException> setExecuteSqlConsumer = (ignoredA, ignoredB) -> null;
+       private TriFunctionWithException<String, String, String, Void, 
SqlExecutionException> setSessionPropertyFunction = (ignoredA, ignoredB, 
ignoredC) -> null;
+       private FunctionWithException<String, Void, SqlExecutionException> 
resetSessionPropertiesFunction = (ignoredA) -> null;
 
        @SafeVarargs
        public final TestingExecutorBuilder 
setResultChangesSupplier(SupplierWithException<TypedResult<List<Tuple2<Boolean, 
Row>>>, SqlExecutionException> ... resultChangesSupplier) {
@@ -76,6 +80,16 @@ class TestingExecutorBuilder {
                return this;
        }
 
+       public final TestingExecutorBuilder 
setSessionPropertiesFunction(TriFunctionWithException<String, String, String, 
Void, SqlExecutionException> setSessionPropertyFunction) {
+               this.setSessionPropertyFunction = setSessionPropertyFunction;
+               return this;
+       }
+
+       public final TestingExecutorBuilder 
resetSessionPropertiesFunction(FunctionWithException<String, Void, 
SqlExecutionException> resetSessionPropertiesFunction) {
+               this.resetSessionPropertiesFunction = 
resetSessionPropertiesFunction;
+               return this;
+       }
+
        public TestingExecutor build() {
                return new TestingExecutor(
                        resultChangesSupplier,
@@ -83,6 +97,8 @@ class TestingExecutorBuilder {
                        resultPagesSupplier,
                        setUseCatalogConsumer,
                        setUseDatabaseConsumer,
-                       setExecuteSqlConsumer);
+                       setExecuteSqlConsumer,
+                       setSessionPropertyFunction,
+                       resetSessionPropertiesFunction);
        }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
index 0ee65cc..e4b9853 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java
@@ -54,7 +54,10 @@ public class TerminalUtils {
                }
        }
 
-       private static class MockOutputStream extends OutputStream {
+       /**
+        * A mock {@link OutputStream} for testing.
+        */
+       public static class MockOutputStream extends OutputStream {
 
                @Override
                public void write(int b) {

Reply via email to