This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a8fb572 [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client a8fb572 is described below commit a8fb572054c5f2294b46200d64729bc3e20c301b Author: Rui Li <li...@apache.org> AuthorDate: Fri Aug 9 11:34:15 2019 +0800 [FLINK-13526][sql-client] Switching to a non existing catalog or database crashes sql-client Avoid crashing sql-client when switching to non-existing catalog or database. This closes #9399. --- .../table/client/gateway/local/LocalExecutor.java | 13 +++++- .../flink/table/client/cli/CliClientTest.java | 54 ++++++++++++++++++++++ .../client/gateway/local/LocalExecutorITCase.java | 23 +++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3394df7..43d0dc3 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -39,6 +39,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.Executor; @@ -240,7 +241,11 @@ public class LocalExecutor implements Executor { context.wrapClassLoader(() -> { // Rely on TableEnvironment/CatalogManager to validate input - tableEnv.useCatalog(catalogName); + try { + tableEnv.useCatalog(catalogName); + } catch (CatalogException e) { + throw new SqlExecutionException("Failed to switch to catalog " + catalogName, e); + } session.setCurrentCatalog(catalogName); session.setCurrentDatabase(tableEnv.getCurrentDatabase()); return null; @@ -256,7 +261,11 @@ public class LocalExecutor implements Executor { context.wrapClassLoader(() -> { // Rely on TableEnvironment/CatalogManager to validate input - tableEnv.useDatabase(databaseName); + try { + tableEnv.useDatabase(databaseName); + } catch (CatalogException e) { + throw new SqlExecutionException("Failed to switch to database " + databaseName, e); + } session.setCurrentDatabase(databaseName); return null; }); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 4ab1d41..7266bcb 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -37,9 +37,13 @@ import org.jline.reader.LineReaderBuilder; import org.jline.reader.ParsedLine; import org.jline.reader.Parser; import org.jline.terminal.Terminal; +import org.jline.terminal.impl.DumbTerminal; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -49,6 +53,10 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; /** * Tests for the {@link CliClient}. @@ -84,6 +92,52 @@ public class CliClientTest extends TestLogger { verifySqlCompletion("show t ", 6, Collections.emptyList(), Collections.singletonList("SET")); } + @Test + public void testUseNonExistingDB() throws Exception { + Executor executor = mock(Executor.class); + doThrow(new SqlExecutionException("mocked exception")).when(executor).useDatabase(any(), any()); + InputStream inputStream = new ByteArrayInputStream("use db;\n".getBytes()); + // don't care about the output + OutputStream outputStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + CliClient cliClient = null; + try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { + cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor); + cliClient.open(); + verify(executor).useDatabase(any(), any()); + } finally { + if (cliClient != null) { + cliClient.close(); + } + } + } + + @Test + public void testUseNonExistingCatalog() throws Exception { + Executor executor = mock(Executor.class); + doThrow(new SqlExecutionException("mocked exception")).when(executor).useCatalog(any(), any()); + InputStream inputStream = new ByteArrayInputStream("use catalog cat;\n".getBytes()); + // don't care about the output + OutputStream outputStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + CliClient cliClient = null; + try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { + cliClient = new CliClient(terminal, new SessionContext("test-session", new Environment()), executor); + cliClient.open(); + verify(executor).useCatalog(any(), any()); + } finally { + if (cliClient != null) { + cliClient.close(); + } + } + } + // -------------------------------------------------------------------------------------------- private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) { diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 963314b..cbae581 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -51,7 +51,9 @@ import org.apache.flink.util.TestLogger; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -126,6 +128,9 @@ public class LocalExecutorITCase extends TestLogger { @Parameter public String planner; + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test public void testValidateSession() throws Exception { final Executor executor = createDefaultExecutor(clusterClient); @@ -506,6 +511,24 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test + public void testUseNonExistingDatabase() throws Exception { + final Executor executor = createDefaultExecutor(clusterClient); + final SessionContext session = new SessionContext("test-session", new Environment()); + + exception.expect(SqlExecutionException.class); + executor.useDatabase(session, "nonexistingdb"); + } + + @Test + public void testUseNonExistingCatalog() throws Exception { + final Executor executor = createDefaultExecutor(clusterClient); + final SessionContext session = new SessionContext("test-session", new Environment()); + + exception.expect(SqlExecutionException.class); + executor.useCatalog(session, "nonexistingcatalog"); + } + private void executeStreamQueryTable( Map<String, String> replaceVars, String query,