This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fb572  [FLINK-13526][sql-client] Switching to a non existing catalog 
or database crashes sql-client
a8fb572 is described below

commit a8fb572054c5f2294b46200d64729bc3e20c301b
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Aug 9 11:34:15 2019 +0800

    [FLINK-13526][sql-client] Switching to a non existing catalog or database 
crashes sql-client
    
    Avoid crashing sql-client when switching to non-existing catalog or 
database.
    
    This closes #9399.
---
 .../table/client/gateway/local/LocalExecutor.java  | 13 +++++-
 .../flink/table/client/cli/CliClientTest.java      | 54 ++++++++++++++++++++++
 .../client/gateway/local/LocalExecutorITCase.java  | 23 +++++++++
 3 files changed, 88 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3394df7..43d0dc3 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
@@ -240,7 +241,11 @@ public class LocalExecutor implements Executor {
 
                context.wrapClassLoader(() -> {
                        // Rely on TableEnvironment/CatalogManager to validate 
input
-                       tableEnv.useCatalog(catalogName);
+                       try {
+                               tableEnv.useCatalog(catalogName);
+                       } catch (CatalogException e) {
+                               throw new SqlExecutionException("Failed to 
switch to catalog " + catalogName, e);
+                       }
                        session.setCurrentCatalog(catalogName);
                        
session.setCurrentDatabase(tableEnv.getCurrentDatabase());
                        return null;
@@ -256,7 +261,11 @@ public class LocalExecutor implements Executor {
 
                context.wrapClassLoader(() -> {
                        // Rely on TableEnvironment/CatalogManager to validate 
input
-                       tableEnv.useDatabase(databaseName);
+                       try {
+                               tableEnv.useDatabase(databaseName);
+                       } catch (CatalogException e) {
+                               throw new SqlExecutionException("Failed to 
switch to database " + databaseName, e);
+                       }
                        session.setCurrentDatabase(databaseName);
                        return null;
                });
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 4ab1d41..7266bcb 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -37,9 +37,13 @@ import org.jline.reader.LineReaderBuilder;
 import org.jline.reader.ParsedLine;
 import org.jline.reader.Parser;
 import org.jline.terminal.Terminal;
+import org.jline.terminal.impl.DumbTerminal;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +53,10 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the {@link CliClient}.
@@ -84,6 +92,52 @@ public class CliClientTest extends TestLogger {
                verifySqlCompletion("show t ", 6, Collections.emptyList(), 
Collections.singletonList("SET"));
        }
 
+       @Test
+       public void testUseNonExistingDB() throws Exception {
+               Executor executor = mock(Executor.class);
+               doThrow(new SqlExecutionException("mocked 
exception")).when(executor).useDatabase(any(), any());
+               InputStream inputStream = new ByteArrayInputStream("use 
db;\n".getBytes());
+               // don't care about the output
+               OutputStream outputStream = new OutputStream() {
+                       @Override
+                       public void write(int b) throws IOException {
+                       }
+               };
+               CliClient cliClient = null;
+               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+                       cliClient = new CliClient(terminal, new 
SessionContext("test-session", new Environment()), executor);
+                       cliClient.open();
+                       verify(executor).useDatabase(any(), any());
+               } finally {
+                       if (cliClient != null) {
+                               cliClient.close();
+                       }
+               }
+       }
+
+       @Test
+       public void testUseNonExistingCatalog() throws Exception {
+               Executor executor = mock(Executor.class);
+               doThrow(new SqlExecutionException("mocked 
exception")).when(executor).useCatalog(any(), any());
+               InputStream inputStream = new ByteArrayInputStream("use catalog 
cat;\n".getBytes());
+               // don't care about the output
+               OutputStream outputStream = new OutputStream() {
+                       @Override
+                       public void write(int b) throws IOException {
+                       }
+               };
+               CliClient cliClient = null;
+               try (Terminal terminal = new DumbTerminal(inputStream, 
outputStream)) {
+                       cliClient = new CliClient(terminal, new 
SessionContext("test-session", new Environment()), executor);
+                       cliClient.open();
+                       verify(executor).useCatalog(any(), any());
+               } finally {
+                       if (cliClient != null) {
+                               cliClient.close();
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private void verifyUpdateSubmission(String statement, boolean 
failExecution, boolean testFailure) {
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 963314b..cbae581 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -51,7 +51,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -126,6 +128,9 @@ public class LocalExecutorITCase extends TestLogger {
        @Parameter
        public String planner;
 
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
+
        @Test
        public void testValidateSession() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
@@ -506,6 +511,24 @@ public class LocalExecutorITCase extends TestLogger {
                }
        }
 
+       @Test
+       public void testUseNonExistingDatabase() throws Exception {
+               final Executor executor = createDefaultExecutor(clusterClient);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               exception.expect(SqlExecutionException.class);
+               executor.useDatabase(session, "nonexistingdb");
+       }
+
+       @Test
+       public void testUseNonExistingCatalog() throws Exception {
+               final Executor executor = createDefaultExecutor(clusterClient);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               exception.expect(SqlExecutionException.class);
+               executor.useCatalog(session, "nonexistingcatalog");
+       }
+
        private void executeStreamQueryTable(
                        Map<String, String> replaceVars,
                        String query,

Reply via email to