This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a20ed14 [FLINK-8864][sql-client] Add cli command history in sql client and make it avaible after restart a20ed14 is described below commit a20ed14374c75d95a0fda5d33bea7f902088d670 Author: Kurt Young <k...@apache.org> AuthorDate: Thu Apr 16 13:19:55 2020 +0800 [FLINK-8864][sql-client] Add cli command history in sql client and make it avaible after restart This closes #11765 --- .../org/apache/flink/table/client/SqlClient.java | 12 +++++- .../apache/flink/table/client/cli/CliClient.java | 18 ++++++-- .../apache/flink/table/client/cli/CliOptions.java | 9 +++- .../flink/table/client/cli/CliOptionsParser.java | 20 ++++++++- .../apache/flink/table/client/cli/CliUtils.java | 21 +++++++++ .../flink/table/client/cli/CliClientTest.java | 50 +++++++++++++++++++--- .../flink/table/client/cli/CliResultViewTest.java | 7 ++- 7 files changed, 123 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java index 599c580..3f68dc6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java @@ -26,11 +26,14 @@ import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.local.LocalExecutor; +import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -119,7 +122,14 @@ public class SqlClient { private void openCli(String sessionId, Executor executor) { CliClient cli = null; try { - cli = new CliClient(sessionId, executor); + Path historyFilePath; + if (options.getHistoryFilePath() != null) { + historyFilePath = Paths.get(options.getHistoryFilePath()); + } else { + historyFilePath = Paths.get(System.getProperty("user.home"), + SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history"); + } + cli = new CliClient(sessionId, executor, historyFilePath); // interactive CLI mode if (options.getUpdateStatement() == null) { cli.open(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 3521de6..f2e4732a 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -83,7 +83,7 @@ public class CliClient { * afterwards using {@link #close()}. */ @VisibleForTesting - public CliClient(Terminal terminal, String sessionId, Executor executor) { + public CliClient(Terminal terminal, String sessionId, Executor executor, Path historyFilePath) { this.terminal = terminal; this.sessionId = sessionId; this.executor = executor; @@ -106,6 +106,18 @@ public class CliClient { lineReader.setVariable(LineReader.ERRORS, 1); // perform code completion case insensitive lineReader.option(LineReader.Option.CASE_INSENSITIVE, true); + // set history file path + if (Files.exists(historyFilePath) || CliUtils.createFile(historyFilePath)) { + String msg = "Command history file path: " + historyFilePath; + // print it in the command line as well as log file + System.out.println(msg); + LOG.info(msg); + lineReader.setVariable(LineReader.HISTORY_FILE, historyFilePath); + } else { + String msg = "Unable to create history file: " + historyFilePath; + System.out.println(msg); + LOG.warn(msg); + } // create prompt prompt = new AttributedStringBuilder() @@ -120,8 +132,8 @@ public class CliClient { * Creates a CLI instance with a prepared terminal. Make sure to close the CLI instance * afterwards using {@link #close()}. */ - public CliClient(String sessionId, Executor executor) { - this(createDefaultTerminal(), sessionId, executor); + public CliClient(String sessionId, Executor executor, Path historyFilePath) { + this(createDefaultTerminal(), sessionId, executor, historyFilePath); } public Terminal getTerminal() { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java index 06a063a..4eaed62 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java @@ -34,6 +34,7 @@ public class CliOptions { private final List<URL> jars; private final List<URL> libraryDirs; private final String updateStatement; + private final String historyFilePath; public CliOptions( boolean isPrintHelp, @@ -42,7 +43,8 @@ public class CliOptions { URL defaults, List<URL> jars, List<URL> libraryDirs, - String updateStatement) { + String updateStatement, + String historyFilePath) { this.isPrintHelp = isPrintHelp; this.sessionId = sessionId; this.environment = environment; @@ -50,6 +52,7 @@ public class CliOptions { this.jars = jars; this.libraryDirs = libraryDirs; this.updateStatement = updateStatement; + this.historyFilePath = historyFilePath; } public boolean isPrintHelp() { @@ -79,4 +82,8 @@ public class CliOptions { public String getUpdateStatement() { return updateStatement; } + + public String getHistoryFilePath() { + return historyFilePath; + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index b924779..a7626bb 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -117,6 +117,17 @@ public class CliOptionsParser { "the target sink table.") .build(); + public static final Option OPTION_HISTORY = Option + .builder("hist") + .required(false) + .longOpt("history") + .numberOfArgs(1) + .argName("History file path") + .desc( + "The file which you want to save the command history into. If not specified, we will " + + "auto-generate one under your user's home directory.") + .build(); + private static final Options EMBEDDED_MODE_CLIENT_OPTIONS = getEmbeddedModeClientOptions(new Options()); private static final Options GATEWAY_MODE_CLIENT_OPTIONS = getGatewayModeClientOptions(new Options()); private static final Options GATEWAY_MODE_GATEWAY_OPTIONS = getGatewayModeGatewayOptions(new Options()); @@ -133,6 +144,7 @@ public class CliOptionsParser { options.addOption(OPTION_JAR); options.addOption(OPTION_LIBRARY); options.addOption(OPTION_UPDATE); + options.addOption(OPTION_HISTORY); return options; } @@ -141,6 +153,7 @@ public class CliOptionsParser { options.addOption(OPTION_SESSION); options.addOption(OPTION_ENVIRONMENT); options.addOption(OPTION_UPDATE); + options.addOption(OPTION_HISTORY); return options; } @@ -235,7 +248,8 @@ public class CliOptionsParser { checkUrl(line, CliOptionsParser.OPTION_DEFAULTS), checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), - line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()) + line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), + line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()) ); } catch (ParseException e) { @@ -254,7 +268,8 @@ public class CliOptionsParser { null, checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), - line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()) + line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), + line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()) ); } catch (ParseException e) { @@ -273,6 +288,7 @@ public class CliOptionsParser { checkUrl(line, CliOptionsParser.OPTION_DEFAULTS), checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), + null, null ); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java index 6b2161f..10836ca 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java @@ -29,6 +29,8 @@ import org.jline.utils.AttributedString; import org.jline.utils.AttributedStringBuilder; import org.jline.utils.AttributedStyle; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Iterator; @@ -142,4 +144,23 @@ public final class CliUtils { throw new RuntimeException("unknown UProperty.EAST_ASIAN_WIDTH: " + value); } } + + /** + * Create the file as well as the parent directory. + */ + public static boolean createFile(final Path filePath) { + try { + final Path parent = filePath.getParent(); + if (parent == null) { + return false; + } + Files.createDirectories(parent); + if (Files.notExists(filePath)) { + Files.createFile(filePath); + } + return true; + } catch (final Exception e) { + return false; + } + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 99819e1..d933b60 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -43,9 +43,12 @@ import org.jline.terminal.impl.DumbTerminal; import org.junit.Test; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -70,13 +73,13 @@ public class CliClientTest extends TestLogger { private static final String SELECT_STATEMENT = "SELECT * FROM MyOtherTable"; @Test - public void testUpdateSubmission() { + public void testUpdateSubmission() throws Exception { verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false); verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, false, false); } @Test - public void testFailedUpdateSubmission() { + public void testFailedUpdateSubmission() throws Exception { // fail at executor verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true); verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, true, true); @@ -116,7 +119,8 @@ public class CliClientTest extends TestLogger { CliClient cliClient = null; try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { - cliClient = new CliClient(terminal, sessionId, executor); + cliClient = new CliClient(terminal, sessionId, executor, File.createTempFile("history", "tmp").toPath()); + cliClient.open(); assertThat(executor.getNumUseDatabaseCalls(), is(1)); } finally { @@ -146,7 +150,7 @@ public class CliClientTest extends TestLogger { String sessionId = executor.openSession(sessionContext); try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { - cliClient = new CliClient(terminal, sessionId, executor); + cliClient = new CliClient(terminal, sessionId, executor, File.createTempFile("history", "tmp").toPath()); cliClient.open(); assertThat(executor.getNumUseCatalogCalls(), is(1)); } finally { @@ -156,9 +160,39 @@ public class CliClientTest extends TestLogger { } } + @Test + public void testHistoryFile() throws Exception { + final SessionContext context = new SessionContext("test-session", new Environment()); + final MockExecutor mockExecutor = new MockExecutor(); + String sessionId = mockExecutor.openSession(context); + + InputStream inputStream = new ByteArrayInputStream("help;\nuse catalog cat;\n".getBytes()); + // don't care about the output + OutputStream outputStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + + CliClient cliClient = null; + try (Terminal terminal = new DumbTerminal(inputStream, outputStream)) { + Path historyFilePath = File.createTempFile("history", "tmp").toPath(); + cliClient = new CliClient(terminal, sessionId, mockExecutor, historyFilePath); + cliClient.open(); + List<String> content = Files.readAllLines(historyFilePath); + assertEquals(2, content.size()); + assertTrue(content.get(0).contains("help")); + assertTrue(content.get(1).contains("use catalog cat")); + } finally { + if (cliClient != null) { + cliClient.close(); + } + } + } + // -------------------------------------------------------------------------------------------- - private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) { + private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) throws Exception { final SessionContext context = new SessionContext("test-session", new Environment()); final MockExecutor mockExecutor = new MockExecutor(); @@ -167,7 +201,11 @@ public class CliClientTest extends TestLogger { CliClient cli = null; try { - cli = new CliClient(TerminalUtils.createDummyTerminal(), sessionId, mockExecutor); + cli = new CliClient( + TerminalUtils.createDummyTerminal(), + sessionId, + mockExecutor, + File.createTempFile("history", "tmp").toPath()); if (testFailure) { assertFalse(cli.submitUpdate(statement)); } else { diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java index 7d5bca0..165ac0a 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java @@ -34,6 +34,7 @@ import org.apache.flink.types.Row; import org.jline.utils.AttributedString; import org.junit.Test; +import java.io.File; import java.util.Collections; import java.util.List; import java.util.Map; @@ -92,7 +93,11 @@ public class CliResultViewTest { Thread resultViewRunner = null; CliClient cli = null; try { - cli = new CliClient(TerminalUtils.createDummyTerminal(), sessionId, executor); + cli = new CliClient( + TerminalUtils.createDummyTerminal(), + sessionId, + executor, + File.createTempFile("history", "tmp").toPath()); resultViewRunner = new Thread(new TestingCliResultView(cli, descriptor, isTableMode)); resultViewRunner.start(); } finally {