This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bab9f9c  Making Samza-Sql-Shell commands pluggable by adding 
CommandHandler (#1106)
bab9f9c is described below

commit bab9f9c9911589509bbc712a568770c7a4523967
Author: Shenoda Guirguis <[email protected]>
AuthorDate: Sun Jul 21 21:09:47 2019 -0700

    Making Samza-Sql-Shell commands pluggable by adding CommandHandler (#1106)
---
 samza-sql-shell/conf/shell-defaults.conf           |   1 +
 .../apache/samza/sql/client/cli/CliCommand.java    |  12 +-
 .../apache/samza/sql/client/cli/CliConstants.java  |   3 +-
 .../samza/sql/client/cli/CliEnvironment.java       |  72 +-
 .../samza/sql/client/cli/CliHighlighter.java       |   1 +
 .../org/apache/samza/sql/client/cli/CliShell.java  | 738 +++------------------
 .../org/apache/samza/sql/client/cli/CliView.java   |   2 +-
 .../java/org/apache/samza/sql/client/cli/Main.java |   7 +-
 .../samza/sql/client/cli/QueryResultLogView.java   |   2 +-
 .../client/{util => exceptions}/CliException.java  |   2 +-
 .../CommandHandlerException.java}                  |  15 +-
 .../ExecutorException.java                         |   2 +-
 .../CliShell.java => impl/CliCommandHandler.java}  | 525 +++++----------
 .../sql/client/{cli => impl}/CliCommandType.java   |  45 +-
 .../samza/sql/client/impl/SamzaExecutor.java       |   2 +
 ...{ExecutorException.java => CommandHandler.java} |  28 +-
 .../CliView.java => interfaces/CommandType.java}   |  24 +-
 .../interfaces/EnvironmentVariableHandlerImpl.java |   2 +-
 .../samza/sql/client/interfaces/SqlExecutor.java   |   1 +
 .../org/apache/samza/sql/client/util/CliUtil.java  |  26 +
 .../samza/sql/client/util/RandomAccessQueue.java   |   2 +
 .../samza/sql/client/impl/SamzaExecutorTest.java   |   2 +-
 22 files changed, 429 insertions(+), 1085 deletions(-)

diff --git a/samza-sql-shell/conf/shell-defaults.conf 
b/samza-sql-shell/conf/shell-defaults.conf
index 0c85e52..65b2e86 100644
--- a/samza-sql-shell/conf/shell-defaults.conf
+++ b/samza-sql-shell/conf/shell-defaults.conf
@@ -22,6 +22,7 @@
 # Example:
 shell.executor = org.apache.samza.sql.client.impl.SamzaExecutor
 samza.sql.output = compact
+# shell.commandhandlers = org.apache.samza.CustomeCommandHandler1, 
com.linkedin.CustomeCommandHandler2
 # samza.sql.system.kafka.address = localhost:2181
 # samza.sql.relSchemaProvider.config.schemaDir = /tmp/schemas/
 # samza.sql.ioResolver = config
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
index 7c6480b..0fc310e 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
@@ -19,23 +19,25 @@
 
 package org.apache.samza.sql.client.cli;
 
+import org.apache.samza.sql.client.interfaces.CommandType;
+
 /**
  * A shell command containing command name and parameters.
  */
-class CliCommand {
-  private CliCommandType commandType;
+public class CliCommand {
+  private CommandType commandType;
   private String parameters;
 
-  public CliCommand(CliCommandType cmdType) {
+  public CliCommand(CommandType cmdType) {
     this.commandType = cmdType;
   }
 
-  public CliCommand(CliCommandType cmdType, String parameters) {
+  public CliCommand(CommandType cmdType, String parameters) {
     this(cmdType);
     this.parameters = parameters;
   }
 
-  public CliCommandType getCommandType() {
+  public CommandType getCommandType() {
     return commandType;
   }
 
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
index b20e614..ead0e9a 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.client.cli;
 /**
  * Constant definitions for the shell.
  */
-class CliConstants {
+public class CliConstants {
   public static final String APP_NAME = "Samza SQL Shell";
   public static final String WINDOW_TITLE = "Samza SQL Shell";
   public static final String PROMPT_1ST = "Samza SQL";
@@ -32,6 +32,7 @@ class CliConstants {
   public static final String CONFIG_SHELL_PREFIX = "shell.";
   // Specifies the executor used by the shell
   public static final String CONFIG_EXECUTOR = "shell.executor";
+  public static final String CONFIG_COMMAND_HANDLER = "shell.commandhandlers";
   public static final String DEFAULT_EXECUTOR_CLASS = 
"org.apache.samza.sql.client.impl.SamzaExecutor";
 
   public static final String WELCOME_MESSAGE;
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
index 3cd9e55..def36f9 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
@@ -19,30 +19,34 @@
 
 package org.apache.samza.sql.client.cli;
 
+import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.sql.client.exceptions.CommandHandlerException;
+import org.apache.samza.sql.client.interfaces.CommandHandler;
 import org.apache.samza.sql.client.interfaces.EnvironmentVariableHandler;
 import org.apache.samza.sql.client.interfaces.EnvironmentVariableSpecs;
-import org.apache.samza.sql.client.interfaces.ExecutorException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.client.interfaces.SqlExecutor;
-import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.exceptions.CliException;
 import org.apache.samza.sql.client.util.CliUtil;
 import org.apache.samza.sql.client.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * CliEnvironment handles "environment variables" that configures the shell 
behavior.
  */
-class CliEnvironment {
+public class CliEnvironment {
   private EnvironmentVariableHandler shellEnvHandler;
   private EnvironmentVariableHandler executorEnvHandler;
   private SqlExecutor executor;
+  private List<CommandHandler> commandHandlers;
   private Map<String, String> delayedExecutorVars;
 
   // shell.executor is special and is specifically handled by CliEnvironment
@@ -51,6 +55,7 @@ class CliEnvironment {
 
   CliEnvironment() {
     shellEnvHandler = new CliShellEnvironmentVariableHandler();
+    commandHandlers = new ArrayList<>();
   }
 
   /** Sets the value of an environment variable.
@@ -61,7 +66,8 @@ class CliEnvironment {
    * -1: invalid name
    * -2: invalid value
    */
-  int setEnvironmentVariable(String name, String value) throws 
ExecutorException{
+  public int setEnvironmentVariable(String name, String value) throws 
ExecutorException, CommandHandlerException {
+    name = name.toLowerCase();
     if(name.equals(CliConstants.CONFIG_EXECUTOR)) {
       createShellExecutor(value);
       activeExecutorClassName = value;
@@ -69,6 +75,14 @@ class CliEnvironment {
       return 0;
     }
 
+    if (name.equals(CliConstants.CONFIG_COMMAND_HANDLER)) {
+      List<String> commandHandlersNames = Arrays.asList(value.split(","));
+      for (String commandHandlerName : commandHandlersNames) {
+        createCommandHandler(commandHandlerName.trim());
+      }
+      return 0;
+    }
+
     EnvironmentVariableHandler handler = getAppropriateHandler(name);
     if(handler == null) {
       // Shell doesn't recognize this variable. There's no executor handler 
yet. Save for future executor
@@ -132,7 +146,7 @@ class CliEnvironment {
    * Gives CliEnvironment a chance to apply settings, especially during 
initialization, things like
    * making default values take effect
    */
-  public void finishInitialization() {
+  public void finishInitialization() throws CliException {
     if(executor == null) {
       try {
         createShellExecutor(CliConstants.DEFAULT_EXECUTOR_CLASS);
@@ -144,7 +158,7 @@ class CliEnvironment {
           }
           delayedExecutorVars = null;
         }
-      } catch (ExecutorException e) {
+      } catch (ExecutorException | CommandHandlerException e) {
         // Convert checked exception ExecutorException to an unchecked 
exception as
         // we have failed to create even the default executor thus not 
recoverable
         throw new CliException(e);
@@ -173,11 +187,39 @@ class CliEnvironment {
     return executor;
   }
 
+  public List<CommandHandler> getCommandHandlers() { return commandHandlers; }
+
   private void createShellExecutor(String executorClassName) throws 
ExecutorException {
     try {
-      Class<?> clazz = Class.forName(executorClassName);
+      executor = (SqlExecutor) createInstance(executorClassName);
+    } catch (ClassCastException e) {
+      String errMsg = String.format("Error trying to cast Object of class %s 
to SqlExecutor", executorClassName);
+      LOG.error(errMsg);
+      throw new ExecutorException(errMsg, e);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new ExecutorException(e);
+    }
+  }
+
+  private void createCommandHandler(String handlerClassName) throws 
CommandHandlerException {
+    try {
+    commandHandlers.add((CommandHandler) createInstance(handlerClassName));
+    } catch (ClassCastException e) {
+      String errMsg = String.format("Error trying to cast Object of class %s 
to CommandHandler", handlerClassName);
+      LOG.error(errMsg);
+      throw new CommandHandlerException(errMsg, e);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new CommandHandlerException(e);
+    }
+  }
+
+  private Object createInstance(String className) throws Exception {
+    try {
+      Class<?> clazz = Class.forName(className);
       Constructor<?> ctor = clazz.getConstructor();
-      executor = (SqlExecutor) ctor.newInstance();
+      return ctor.newInstance();
     } catch (ClassNotFoundException | NoSuchMethodException
             | IllegalAccessException | InstantiationException | 
InvocationTargetException e) {
       throw new ExecutorException(e);
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
index c01ed5c..c6ac6c7 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.client.cli;
 
+import org.apache.samza.sql.client.impl.CliCommandType;
 import org.apache.samza.sql.client.util.CliUtil;
 import org.jline.reader.Highlighter;
 import org.jline.reader.LineReader;
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
index 6212d9b..51bb429 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
@@ -19,13 +19,19 @@
 
 package org.apache.samza.sql.client.cli;
 
-import java.util.stream.Collectors;
-import org.apache.samza.sql.client.interfaces.*;
-import org.apache.samza.sql.client.util.CliException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.client.exceptions.CommandHandlerException;
+import org.apache.samza.sql.client.impl.CliCommandHandler;
+import org.apache.samza.sql.client.impl.CliCommandType;
+import org.apache.samza.sql.client.interfaces.CommandHandler;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.exceptions.CliException;
 import org.apache.samza.sql.client.util.CliUtil;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.schema.SqlFieldSchema;
-import org.apache.samza.sql.schema.SqlSchema;
 import org.jline.reader.EndOfFileException;
 import org.jline.reader.LineReader;
 import org.jline.reader.LineReaderBuilder;
@@ -40,38 +46,28 @@ import org.jline.utils.InfoCmp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
 /**
  * The shell UI.
  */
-class CliShell {
+public class CliShell {
   private static final Logger LOG = LoggerFactory.getLogger(CliShell.class);
   private final Terminal terminal;
   private final PrintWriter writer;
   private final LineReader lineReader;
   private final String firstPrompt;
   private SqlExecutor executor;
+  private List<CommandHandler> commandHandlers;
   private final ExecutionContext exeContext;
-  private CliEnvironment env;
   private boolean keepRunning = true;
-  private Map<Integer, String> executions = new TreeMap<>();
 
-  CliShell(CliEnvironment environment) throws ExecutorException{
+  CliShell(CliEnvironment environment) throws ExecutorException {
     if (environment == null) {
       throw new IllegalArgumentException();
     }
 
     // Terminal
     try {
-      terminal = TerminalBuilder.builder()
-              .name(CliConstants.WINDOW_TITLE)
-              .build();
+      terminal = 
TerminalBuilder.builder().name(CliConstants.WINDOW_TITLE).build();
     } catch (IOException e) {
       throw new CliException("Error when creating terminal", e);
     }
@@ -80,28 +76,35 @@ class CliShell {
     writer = terminal.writer();
 
     // LineReader
-    final DefaultParser parser = new DefaultParser()
-            .eofOnEscapedNewLine(true)
-            .eofOnUnclosedQuote(true);
+    final DefaultParser parser = new 
DefaultParser().eofOnEscapedNewLine(true).eofOnUnclosedQuote(true);
     lineReader = LineReaderBuilder.builder()
-            .appName(CliConstants.APP_NAME)
-            .terminal(terminal)
-            .parser(parser)
-            .highlighter(new CliHighlighter())
-            .completer(new StringsCompleter(CliCommandType.getAllCommands()))
-            .build();
+        .appName(CliConstants.APP_NAME)
+        .terminal(terminal)
+        .parser(parser)
+        .highlighter(new CliHighlighter())
+        .completer(new StringsCompleter(CliCommandType.getAllCommands()))
+        .build();
 
     // Command Prompt
-    firstPrompt = new AttributedStringBuilder()
-            .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
-            .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
-            .toAnsi();
+    firstPrompt = new 
AttributedStringBuilder().style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
+        .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
+        .toAnsi();
 
     // Execution context and executor
-    env = environment;
-    executor = env.getExecutor();
+    executor = environment.getExecutor();
     exeContext = new ExecutionContext();
     executor.start(exeContext);
+
+    // Command handlers
+    if (commandHandlers == null) {
+      commandHandlers = new ArrayList<>();
+    }
+    commandHandlers.add(new CliCommandHandler());
+    commandHandlers.addAll(environment.getCommandHandlers());
+    for (CommandHandler commandHandler : commandHandlers) {
+      LOG.info("init commandHandler {}", commandHandler.getClass().getName());
+      commandHandler.init(this, environment, terminal, exeContext);
+    }
   }
 
   Terminal getTerminal() {
@@ -146,89 +149,40 @@ class CliShell {
         } catch (UserInterruptException e) {
           continue;
         } catch (EndOfFileException e) {
-          commandQuit();
+          keepRunning = false;
           break;
         }
 
         if (CliUtil.isNullOrEmpty(line))
           continue;
 
-        CliCommand command = parseLine(line);
-        if (command == null)
+        String helpCmdText = CliCommandType.HELP.getCommandName();
+        if (line.toUpperCase().startsWith(helpCmdText)) {
+          if (line.toLowerCase().trim().equals(helpCmdText)) {
+            printHelpMessage();
+          } else {
+            CommandAndHandler commandAndHandler = 
findHandlerForCommand(line.substring(helpCmdText.length()));
+            if (commandAndHandler.handler != null) {
+              
commandAndHandler.handler.handleCommand(commandAndHandler.handler.parseLine(line));
+            } else {
+              printHelpMessage();
+            }
+          }
           continue;
+        }
 
+        CommandAndHandler commandAndHandler = null;
         try {
-          switch (command.getCommandType()) {
-            case CLEAR:
-              commandClear();
-              break;
-
-            case DESCRIBE:
-              commandDescribe(command);
-              break;
-
-            case EXECUTE:
-              commandExecuteFile(command);
-              break;
-
-            case EXIT:
-            case QUIT:
-              commandQuit();
-              break;
-
-            case HELP:
-              commandHelp(command);
-              break;
-
-            case INSERT_INTO:
-              commandInsertInto(command);
-              break;
-
-            case LS:
-              commandLs(command);
-              break;
-
-            case RM:
-              commandRm(command);
-              break;
-
-            case SELECT:
-              commandSelect(command);
-              break;
-
-            case SET:
-              commandSet(command);
-              break;
-
-            case SHOW_FUNCTIONS:
-              commandShowFunctions(command);
-              break;
-
-            case SHOW_TABLES:
-              commandShowTables(command);
-              break;
-
-            case STOP:
-              commandStop(command);
-              break;
-
-            case VERSION:
-              commandVersion(command);
-              break;
-
-            case INVALID_COMMAND:
-              printHelpMessage();
-              break;
-
-            default:
-              writer.println("UNDER DEVELOPEMENT. Command:" + 
command.getCommandType());
-              writer.println("Parameters:" +
-                      (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" 
: command.getParameters()));
-              writer.flush();
+          commandAndHandler = findHandlerForCommand(line);
+          if (commandAndHandler.handler == null) {
+            LOG.info("no commandHandler found for command {}", line);
+            printHelpMessage();
+            continue;
           }
-        } catch (ExecutorException e) {
+          keepRunning = 
commandAndHandler.handler.handleCommand(commandAndHandler.command);
+        } catch (CommandHandlerException e) {
           writer.println("Error: " + e);
-          LOG.error("Error in {}: ", command.getCommandType(), e);
+          LOG.error("Error in {}: ", 
commandAndHandler.command.getCommandType(), e);
           writer.flush();
         }
       }
@@ -253,10 +207,6 @@ class CliShell {
     }
   }
 
-  private void commandVersion(CliCommand command) {
-    printVersion();
-  }
-
   private void printVersion() {
     String version = String.format("Shell version %s, Executor is %s, version 
%s",
             this.getClass().getPackage().getImplementationVersion(),
@@ -265,564 +215,40 @@ class CliShell {
     writer.println(version);
   }
 
-  private void commandClear() {
-    clearScreen();
-  }
-
-  private void commandDescribe(CliCommand command) throws ExecutorException {
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
-    SqlSchema schema = executor.getTableSchema(exeContext, parameters);
-    List<String> lines = formatSchema4Display(schema);
-    for (String line : lines) {
-      writer.println(line);
-    }
-    writer.flush();
-  }
-
-  private void commandSet(CliCommand command) throws ExecutorException{
-    String param = command.getParameters();
-    if (CliUtil.isNullOrEmpty(param)) {
-      env.printAll(writer);
-      writer.flush();
-      return;
-    }
-    String[] params = null;
-    boolean syntaxValid = param.split(" ").length == 1;
-    if(syntaxValid) {
-      params = param.split("=");
-      if(params.length == 1) {
-        String value = env.getEnvironmentVariable(param);
-        if(!CliUtil.isNullOrEmpty(value)) {
-          env.printVariable(writer, param, value);
-        }
-        return;
-      } else {
-        syntaxValid = params.length == 2;
-      }
-    }
-    if(!syntaxValid) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
-    String name = params[0].trim().toLowerCase();
-    String value = params[1].trim();
-    int ret = env.setEnvironmentVariable(name, value);
-    if (ret == 0) {
-      writer.print(name);
-      writer.print(" set to ");
-      writer.println(value);
-      if(name.equals(CliConstants.CONFIG_EXECUTOR)) {
-        executor.stop(exeContext);
-        executor = env.getExecutor();
-        executor.start(exeContext);
-      }
-    } else if (ret == -1) {
-      writer.print("Unknow variable: ");
-      writer.println(name);
-    } else if (ret == -2) {
-      writer.print("Invalid value: ");
-      writer.print(value);
-      String[] vals = env.getPossibleValues(name);
-      if(vals != null && vals.length != 0) {
-        writer.print(" Possible values:");
-        for (String s : vals) {
-          writer.print(CliConstants.SPACE);
-          writer.print(s);
-        }
-        writer.println();
-      }
-    }
-
-    writer.flush();
-  }
-
-  private void commandExecuteFile(CliCommand command) throws ExecutorException{
-    String fullCmdStr = command.getFullCommand();
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println("Usage: execute <fileuri>");
-      writer.flush();
-      return;
-    }
-    URI uri = null;
-    boolean valid = false;
-    File file = null;
-    try {
-      uri = new URI(parameters);
-      file = new File(uri.getPath());
-      valid = file.exists() && !file.isDirectory();
-    } catch (URISyntaxException e) {
-    }
-    if (!valid) {
-      writer.println("Invalid URI.");
-      writer.flush();
-      return;
-    }
-
-    NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
-    executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
-    List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
-    List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
-
-    writer.println("Sql file submitted. Execution ID: " + 
nonQueryResult.getExecutionId());
-    writer.println("Submitted statements:");
-    if (submittedStmts == null || submittedStmts.size() == 0) {
-      writer.println("\tNone.");
-    } else {
-      for (String statement : submittedStmts) {
-        writer.print("\t");
-        writer.println(statement);
-      }
-    }
-
-    if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
-      writer.println("Statements NOT submitted:");
-      for (String statement : nonsubmittedStmts) {
-        writer.print("\t");
-        writer.println(statement);
-      }
-    }
-
-    writer.println("Note: All query statements in a sql file are NOT 
submitted.");
-    writer.flush();
-  }
-
-  private void commandInsertInto(CliCommand command) throws ExecutorException {
-    String fullCmdStr = command.getFullCommand();
-    NonQueryResult result = executor.executeNonQuery(exeContext,
-            Collections.singletonList(fullCmdStr));
-
-    writer.print("Execution submitted successfully. Id: ");
-    writer.println(String.valueOf(result.getExecutionId()));
-    executions.put(result.getExecutionId(), fullCmdStr);
-
-    writer.flush();
-  }
-
-  private void commandLs(CliCommand command) {
-    List<Integer> execIds = new ArrayList<>();
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      execIds.addAll(executions.keySet());
-    } else {
-      String[] params = parameters.split("\u0020");
-      for (String param : params) {
-        Integer id = null;
-        try {
-          id = Integer.valueOf(param);
-        } catch (NumberFormatException e) {
-        }
-        if (id != null && executions.containsKey(id)) {
-          execIds.add(id);
-        }
-      }
-    }
-    if (execIds.size() == 0) {
-      return;
-    }
-
-    execIds.sort(Integer::compareTo);
-    final int terminalWidth = terminal.getWidth();
-    final int ID_WIDTH = 3;
-    final int STATUS_WIDTH = 20;
-    final int CMD_WIDTH = terminalWidth - ID_WIDTH - STATUS_WIDTH - 4;
-
-    AttributedStyle oddLineStyle = 
AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
-    AttributedStyle evenLineStyle = 
AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
-    for (int i = 0; i < execIds.size(); ++i) {
-      Integer id = execIds.get(i);
-      String cmd = executions.get(id);
-      if (cmd == null)
-        continue;
-
-      String status = "UNKNOWN";
-      try {
-        ExecutionStatus execStatus = executor.queryExecutionStatus(id);
-        status = execStatus.name();
-      } catch (ExecutorException e) {
-        LOG.error("Error in commandLs: ", e);
-      }
-
-      int cmdStartIdx = 0;
-      int cmdLength = cmd.length();
-      StringBuilder line;
-      while (cmdStartIdx < cmdLength) {
-        line = new StringBuilder(terminalWidth);
-        if (cmdStartIdx == 0) {
-          line.append(CliConstants.SPACE);
-          line.append(id);
-          CliUtil.appendTo(line, 1 + ID_WIDTH + 1, CliConstants.SPACE);
-          line.append(status);
-        }
-        CliUtil.appendTo(line, 1 + ID_WIDTH + 1 + STATUS_WIDTH + 1, 
CliConstants.SPACE);
-
-        int numToWrite = Math.min(CMD_WIDTH, cmdLength - cmdStartIdx);
-        if (numToWrite > 0) {
-          line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
-          cmdStartIdx += numToWrite;
-        }
-
-        if (i % 2 == 0) {
-          AttributedStringBuilder attrBuilder = new 
AttributedStringBuilder().style(evenLineStyle);
-          attrBuilder.append(line.toString());
-          writer.println(attrBuilder.toAnsi());
-        } else {
-          AttributedStringBuilder attrBuilder = new 
AttributedStringBuilder().style(oddLineStyle);
-          attrBuilder.append(line.toString());
-          writer.println(attrBuilder.toAnsi());
-        }
-      }
-    }
-    writer.flush();
-  }
-
-  private void commandRm(CliCommand command) {
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
-    List<Integer> execIds = new ArrayList<>();
-    String[] params = parameters.split("\u0020");
-    for (String param : params) {
-      Integer id = null;
-      try {
-        id = Integer.valueOf(param);
-      } catch (NumberFormatException e) {
-      }
-      if (id == null || !executions.containsKey(id)) {
-        writer.print("Error: ");
-        writer.print(param);
-        writer.println(" is not a valid id.");
-      } else {
-        execIds.add(id);
-      }
-    }
-
-    for (Integer id : execIds) {
-      try {
-        ExecutionStatus status = executor.queryExecutionStatus(id);
-        if (status == ExecutionStatus.Running) {
-          writer.println(String.format("Execution %d is still running. Stop it 
first.", id));
-          continue;
-        }
-        executor.removeExecution(exeContext, id);
-        executions.remove(id);
-      } catch (ExecutorException e) {
-        writer.println("Error: " + e);
-        LOG.error("Error in commandRm: ", e);
-      }
-    }
-    writer.flush();
-  }
-
-  private void commandQuit() {
-    keepRunning = false;
-  }
-
-  private void commandSelect(CliCommand command) throws ExecutorException{
-    QueryResult queryResult = executor.executeQuery(exeContext, 
command.getFullCommand());
-    CliView view = new QueryResultLogView();
-    view.open(this, queryResult);
-    executor.stopExecution(exeContext, queryResult.getExecutionId());
-  }
-
-  private void commandShowTables(CliCommand command) throws ExecutorException {
-    List<String> tableNames = executor.listTables(exeContext);
-    for (String tableName : tableNames) {
-      writer.println(tableName);
-    }
-    writer.flush();
-  }
-
-  private void commandShowFunctions(CliCommand command) throws 
ExecutorException {
-    List<SqlFunction> fns = executor.listFunctions(exeContext);
-    for (SqlFunction fn : fns) {
-      writer.println(fn.toString());
-    }
-    writer.flush();
-  }
-
-  private void commandStop(CliCommand command) {
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
-    List<Integer> execIds = new ArrayList<>();
-    String[] params = parameters.split("\u0020");
-    for (String param : params) {
-      Integer id = null;
-      try {
-        id = Integer.valueOf(param);
-      } catch (NumberFormatException e) {
-      }
-      if (id == null || !executions.containsKey(id)) {
-        writer.print("Error: ");
-        writer.print(param);
-        writer.println(" is not a valid id.");
-      } else {
-        execIds.add(id);
-      }
-    }
-
-    for (Integer id : execIds) {
-      try {
-        executor.stopExecution(exeContext, id);
-        writer.println(String.format("Request to stop execution %d was sent.", 
id));
-      } catch (ExecutorException e) {
-        writer.println("Error: " + e);
-        LOG.error("Error in commandStop: ", e);
-      }
-    }
-    writer.flush();
-  }
-
-  private void commandHelp(CliCommand command) {
-    String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      printHelpMessage();
-      return;
-    }
-
-    parameters = parameters.trim().toUpperCase();
-    for (CliCommandType cmdType : CliCommandType.values()) {
-      String cmdText = cmdType.getCommandName();
-      if (cmdText.equals(parameters)) {
-        writer.println(cmdType.getUsage());
-        writer.flush();
-        return;
-      }
-    }
-
-    writer.print("Unknown command: ");
-    writer.println(parameters);
-    writer.flush();
-  }
-
-
-  private CliCommand parseLine(String line) {
-    line = trimCommand(line);
-    if (CliUtil.isNullOrEmpty(line))
-      return null;
-
-    String upperCaseLine = line.toUpperCase();
-    for (CliCommandType cmdType : CliCommandType.values()) {
-      String cmdText = cmdType.getCommandName();
-      if (upperCaseLine.startsWith(cmdText)) {
-        if (upperCaseLine.length() == cmdText.length())
-          return new CliCommand(cmdType);
-        else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) 
{
-          String parameter = line.substring(cmdText.length()).trim();
-          if (!parameter.isEmpty())
-            return new CliCommand(cmdType, parameter);
-        }
-      }
-    }
-    return new CliCommand(CliCommandType.INVALID_COMMAND);
+  private void clearScreen() {
+    terminal.puts(InfoCmp.Capability.clear_screen);
   }
 
   private void printHelpMessage() {
-    AttributedStringBuilder builder = new AttributedStringBuilder();
-    builder.append("The following commands are supported by ")
-            .append(CliConstants.APP_NAME)
-            .append(" at the moment.\n\n");
-
-    for (CliCommandType cmdType : CliCommandType.values()) {
-      if (cmdType == CliCommandType.INVALID_COMMAND)
-        continue;
-
-      String cmdText = cmdType.getCommandName();
-      String cmdDescription = cmdType.getDescription();
-
-      builder.style(AttributedStyle.DEFAULT.bold())
-              .append(cmdText)
-              .append("\t\t")
-              .style(AttributedStyle.DEFAULT)
-              .append(cmdDescription)
-              .append("\n");
+    for (CommandHandler commandHandler : commandHandlers) {
+      commandHandler.printHelpMessage();
     }
-
-    writer.println(builder.toAnsi());
     writer.println("HELP <COMMAND> to get help for a specific command.");
     writer.flush();
   }
 
-  private void clearScreen() {
-    terminal.puts(InfoCmp.Capability.clear_screen);
-  }
-
-  /*
-      Field    | Type
-      -------------------------
-      Field1   | Type 1
-      Field2   | VARCHAR(STRING)
-      Field... | VARCHAR(STRING)
-      -------------------------
-  */
-  private List<String> formatSchema4Display(SqlSchema schema) {
-    final String HEADER_FIELD = "Field";
-    final String HEADER_TYPE = "Type";
-    final char SEPERATOR = '|';
-    final char LINE_SEP = '-';
-
-    int terminalWidth = terminal.getWidth();
-    // Two spaces * 2 plus one SEPERATOR
-    if (terminalWidth < 2 + 2 + 1 + HEADER_FIELD.length() + 
HEADER_TYPE.length()) {
-      return Collections.singletonList("Not enough room.");
-    }
-
-    // Find the best seperator position for least rows
-    int seperatorPos = HEADER_FIELD.length() + 2;
-    int minRowNeeded = Integer.MAX_VALUE;
-    int longestLineCharNum = 0;
-    int rowCount = schema.getFields().size();
-    for (int j = seperatorPos; j < terminalWidth - HEADER_TYPE.length() - 2; 
++j) {
-      boolean fieldWrapped = false;
-      int rowNeeded = 0;
-      for (int i = 0; i < rowCount; ++i) {
-        SqlSchema.SqlField field = schema.getFields().get(i);
-        int fieldLen = field.getFieldName().length();
-        int typeLen = 
field.getFieldSchema().getFieldType().toString().length();
-        int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
-        int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j 
- 2);
-
-        rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
-        fieldWrapped |= fieldRowNeeded > 1;
-        if (typeRowNeeded > 1) {
-          longestLineCharNum = terminalWidth;
-        } else {
-          longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 
1);
-        }
-      }
-      if (rowNeeded < minRowNeeded) {
-        minRowNeeded = rowNeeded;
-        seperatorPos = j;
-      }
-      if (!fieldWrapped)
+  private CommandAndHandler findHandlerForCommand(String line) {
+    CommandHandler commandHandler = null;
+    CliCommand parsedCommand = null;
+    for (CommandHandler curCommandHandler : commandHandlers) {
+      parsedCommand = curCommandHandler.parseLine(line);
+      if (parsedCommand != null && 
!parsedCommand.getCommandType().getCommandName().equals(CliCommandType.INVALID_COMMAND.getCommandName()))
 {
+        commandHandler = curCommandHandler;
+        LOG.info("Found commandHandler {} to handle command {}", 
commandHandler.getClass().getName(),
+            parsedCommand.getFullCommand());
         break;
-    }
-
-    List<String> lines = new ArrayList<>(minRowNeeded + 4);
-
-    // Header
-    StringBuilder line = new StringBuilder(terminalWidth);
-    line.append(CliConstants.SPACE);
-    line.append(HEADER_FIELD);
-    CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
-    line.append(SEPERATOR);
-    line.append(CliConstants.SPACE);
-    line.append(HEADER_TYPE);
-    lines.add(line.toString());
-    line = new StringBuilder(terminalWidth);
-    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
-    lines.add(line.toString());
-
-    // Body
-    AttributedStyle oddLineStyle = 
AttributedStyle.BOLD.foreground(AttributedStyle.BLUE);
-    AttributedStyle evenLineStyle = 
AttributedStyle.BOLD.foreground(AttributedStyle.CYAN);
-
-    final int fieldColSize = seperatorPos - 2;
-    final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
-    for (int i = 0; i < rowCount; ++i) {
-      SqlSchema.SqlField sqlField = schema.getFields().get(i);
-      String field = sqlField.getFieldName();
-      String type = getFieldDisplayValue(sqlField.getFieldSchema());
-      int fieldLen = field.length();
-      int typeLen = type.length();
-      int fieldStartIdx = 0, typeStartIdx = 0;
-      while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
-        line = new StringBuilder(terminalWidth);
-        line.append(CliConstants.SPACE);
-        int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
-        if (numToWrite > 0) {
-          line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
-          fieldStartIdx += numToWrite;
-        }
-        CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
-        line.append(SEPERATOR);
-        line.append(CliConstants.SPACE);
-
-        numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
-        if (numToWrite > 0) {
-          line.append(type, typeStartIdx, typeStartIdx + numToWrite);
-          typeStartIdx += numToWrite;
-        }
-
-        if (i % 2 == 0) {
-          AttributedStringBuilder attrBuilder = new 
AttributedStringBuilder().style(evenLineStyle);
-          attrBuilder.append(line.toString());
-          lines.add(attrBuilder.toAnsi());
-        } else {
-          AttributedStringBuilder attrBuilder = new 
AttributedStringBuilder().style(oddLineStyle);
-          attrBuilder.append(line.toString());
-          lines.add(attrBuilder.toAnsi());
-        }
       }
     }
-
-    // Footer
-    line = new StringBuilder(terminalWidth);
-    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
-    lines.add(line.toString());
-    return lines;
+    return new CommandAndHandler(parsedCommand, commandHandler);
   }
 
-  private String getFieldDisplayValue(SqlFieldSchema fieldSchema) {
-    if (!isComplexField(fieldSchema.getFieldType())) {
-      return fieldSchema.getFieldType().toString();
-    }
-    SamzaSqlFieldType fieldType = fieldSchema.getFieldType();
-    switch (fieldType) {
-      case ARRAY:
-        return String.format("ARRAY(%s)", 
getFieldDisplayValue(fieldSchema.getElementSchema()));
-      case MAP:
-        return String.format("MAP(%s, %s)", 
SamzaSqlFieldType.STRING.toString(),
-            getFieldDisplayValue(fieldSchema.getValueScehma()));
-      case ROW:
-        String rowDisplayValue = fieldSchema.getRowSchema()
-            .getFields()
-            .stream()
-            .map(f -> getFieldDisplayValue(f.getFieldSchema()))
-            .collect(Collectors.joining(","));
-        return String.format("ROW(%s)", rowDisplayValue);
-      default:
-        throw new UnsupportedOperationException("Unknown field type " + 
fieldType);
-    }
-  }
+  class CommandAndHandler {
+    CliCommand command;
+    CommandHandler handler;
 
-  private boolean isComplexField(SamzaSqlFieldType fieldtype) {
-    return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == 
SamzaSqlFieldType.MAP
-        || fieldtype == SamzaSqlFieldType.ROW;
-  }
-
-  // Trims: leading spaces; trailing spaces and ";"s
-  private String trimCommand(String command) {
-    if (CliUtil.isNullOrEmpty(command))
-      return command;
-
-    int len = command.length();
-    int st = 0;
-
-    while ((st < len) && (command.charAt(st) <= ' ')) {
-      st++;
-    }
-    while ((st < len) && ((command.charAt(len - 1) <= ' ')
-            || command.charAt(len - 1) == ';')) {
-      len--;
+    CommandAndHandler(CliCommand aCommand, CommandHandler itsHandler) {
+      command = aCommand;
+      handler = itsHandler;
     }
-    return ((st > 0) || (len < command.length())) ? command.substring(st, len) 
: command;
   }
-}
+}
\ No newline at end of file
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
index d647de2..02d4daf 100644
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.sql.client.cli;
 
-import org.apache.samza.sql.client.interfaces.ExecutorException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.client.interfaces.QueryResult;
 
 /**
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
index c84c630..6ee2cf3 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
@@ -23,7 +23,8 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 
-import org.apache.samza.sql.client.interfaces.ExecutorException;
+import org.apache.samza.sql.client.exceptions.CommandHandlerException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.client.util.CliUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +72,7 @@ public class Main {
             String key = strs[0].trim().toLowerCase();
             String value = strs[1].trim();
             try {
+              LOG.info("Configuration: setting {} = {}", key, value);
               int result = environment.setEnvironmentVariable(key, value);
               if (result == -1) { // CliEnvironment doesn't recognize the key.
                 LOG.warn("Unknowing shell environment variable: {}", key);
@@ -81,6 +83,9 @@ public class Main {
               messageBuilder.append("Warning: Failed to create executor: 
").append(value).append('\n');
               messageBuilder.append("Warning: Using default executor " + 
CliConstants.DEFAULT_EXECUTOR_CLASS);
               LOG.error("Failed to create user specified executor {}", value, 
e);
+            } catch (CommandHandlerException e) {
+              messageBuilder.append("Warning: Failed to create CommandHandler: 
").append(value).append('\n');
+              LOG.error("Failed to create user specified CommandHandler {}", 
value, e);
             }
           }
         } catch (IOException e) {
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
index a822477..6a8c787 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
@@ -20,7 +20,7 @@
 package org.apache.samza.sql.client.cli;
 
 import org.apache.samza.sql.client.interfaces.ExecutionContext;
-import org.apache.samza.sql.client.interfaces.ExecutorException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.client.interfaces.QueryResult;
 import org.apache.samza.sql.client.interfaces.SqlExecutor;
 import org.jline.keymap.BindingReader;
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CliException.java
similarity index 96%
copy from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
copy to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CliException.java
index 743ff44..3406960 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CliException.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.util;
+package org.apache.samza.sql.client.exceptions;
 
 /**
  * The exception used by the shell for unrecoverable errors.
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CommandHandlerException.java
old mode 100755
new mode 100644
similarity index 69%
rename from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
rename to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CommandHandlerException.java
index 743ff44..77fc48c
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/CommandHandlerException.java
@@ -17,25 +17,24 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.util;
+package org.apache.samza.sql.client.exceptions;
 
 /**
- * The exception used by the shell for unrecoverable errors.
+ * A CommandHandler throws a CommandHandlerException when it encounters an 
error.
  */
-public class CliException extends RuntimeException {
-  public CliException() {
-
+public class CommandHandlerException extends Exception {
+  public CommandHandlerException() {
   }
 
-  public CliException(String message) {
+  public CommandHandlerException(String message) {
     super(message);
   }
 
-  public CliException(String message, Throwable cause) {
+  public CommandHandlerException(String message, Throwable cause) {
     super(message, cause);
   }
 
-  public CliException(Throwable cause) {
+  public CommandHandlerException(Throwable cause) {
     super(cause);
   }
 }
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/ExecutorException.java
similarity index 96%
copy from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
copy to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/ExecutorException.java
index b1f08de..e582627 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/exceptions/ExecutorException.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.interfaces;
+package org.apache.samza.sql.client.exceptions;
 
 /**
  * An executor shall throw an ExecutorException when it encounters an error.
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandHandler.java
old mode 100755
new mode 100644
similarity index 64%
copy from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
copy to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandHandler.java
index 6212d9b..9a1b8a1
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandHandler.java
@@ -17,251 +17,201 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.cli;
+package org.apache.samza.sql.client.impl;
 
-import java.util.stream.Collectors;
-import org.apache.samza.sql.client.interfaces.*;
-import org.apache.samza.sql.client.util.CliException;
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.samza.sql.client.cli.CliCommand;
+import org.apache.samza.sql.client.cli.CliConstants;
+import org.apache.samza.sql.client.cli.CliEnvironment;
+import org.apache.samza.sql.client.cli.CliShell;
+import org.apache.samza.sql.client.cli.CliView;
+import org.apache.samza.sql.client.cli.QueryResultLogView;
+import org.apache.samza.sql.client.exceptions.CommandHandlerException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
+import org.apache.samza.sql.client.interfaces.CommandHandler;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.ExecutionStatus;
+import org.apache.samza.sql.client.interfaces.NonQueryResult;
+import org.apache.samza.sql.client.interfaces.QueryResult;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.interfaces.SqlFunction;
 import org.apache.samza.sql.client.util.CliUtil;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.schema.SqlFieldSchema;
 import org.apache.samza.sql.schema.SqlSchema;
-import org.jline.reader.EndOfFileException;
-import org.jline.reader.LineReader;
-import org.jline.reader.LineReaderBuilder;
-import org.jline.reader.UserInterruptException;
-import org.jline.reader.impl.DefaultParser;
-import org.jline.reader.impl.completer.StringsCompleter;
 import org.jline.terminal.Terminal;
-import org.jline.terminal.TerminalBuilder;
 import org.jline.utils.AttributedStringBuilder;
 import org.jline.utils.AttributedStyle;
 import org.jline.utils.InfoCmp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
 
-/**
- * The shell UI.
- */
-class CliShell {
-  private static final Logger LOG = LoggerFactory.getLogger(CliShell.class);
-  private final Terminal terminal;
-  private final PrintWriter writer;
-  private final LineReader lineReader;
-  private final String firstPrompt;
+public class CliCommandHandler implements CommandHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CliCommandHandler.class);
+  private CliShell shell;
+  private PrintWriter writer;
+  private Terminal terminal;
+  private ExecutionContext exeContext;
   private SqlExecutor executor;
-  private final ExecutionContext exeContext;
   private CliEnvironment env;
-  private boolean keepRunning = true;
   private Map<Integer, String> executions = new TreeMap<>();
 
-  CliShell(CliEnvironment environment) throws ExecutorException{
-    if (environment == null) {
-      throw new IllegalArgumentException();
-    }
-
-    // Terminal
-    try {
-      terminal = TerminalBuilder.builder()
-              .name(CliConstants.WINDOW_TITLE)
-              .build();
-    } catch (IOException e) {
-      throw new CliException("Error when creating terminal", e);
-    }
 
-    // Terminal writer
-    writer = terminal.writer();
-
-    // LineReader
-    final DefaultParser parser = new DefaultParser()
-            .eofOnEscapedNewLine(true)
-            .eofOnUnclosedQuote(true);
-    lineReader = LineReaderBuilder.builder()
-            .appName(CliConstants.APP_NAME)
-            .terminal(terminal)
-            .parser(parser)
-            .highlighter(new CliHighlighter())
-            .completer(new StringsCompleter(CliCommandType.getAllCommands()))
-            .build();
-
-    // Command Prompt
-    firstPrompt = new AttributedStringBuilder()
-            .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
-            .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
-            .toAnsi();
-
-    // Execution context and executor
-    env = environment;
+  public void init(CliShell shell, CliEnvironment env, Terminal terminal, 
ExecutionContext exeContext) {
+    this.env = env;
     executor = env.getExecutor();
-    exeContext = new ExecutionContext();
-    executor.start(exeContext);
+    this.terminal = terminal;
+    this.writer = terminal.writer();
+    this.exeContext = exeContext;
+    this.shell = shell;
   }
 
-  Terminal getTerminal() {
-    return terminal;
-  }
+  public CliCommand parseLine(String line) {
+    line = CliUtil.trimCommand(line);
+    if (CliUtil.isNullOrEmpty(line))
+      return null;
 
-  SqlExecutor getExecutor() {
-    return executor;
+    String upperCaseLine = line.toUpperCase();
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      String cmdText = cmdType.getCommandName();
+      if (upperCaseLine.startsWith(cmdText)) {
+        if (upperCaseLine.length() == cmdText.length())
+          return new CliCommand(cmdType);
+        else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) 
{
+          String parameter = line.substring(cmdText.length()).trim();
+          if (!parameter.isEmpty())
+            return new CliCommand(cmdType, parameter);
+        }
+      }
+    }
+    return new CliCommand(CliCommandType.INVALID_COMMAND);
   }
 
-  ExecutionContext getExeContext() {
-    return exeContext;
+  public void printHelpMessage() {
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    builder.append("The following commands are supported by ")
+        .append(CliConstants.APP_NAME)
+        .append("by handler ")
+        .append(this.getClass().getName())
+        .append(" at the moment.\n\n");
+
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      if (cmdType == CliCommandType.INVALID_COMMAND)
+        continue;
+
+      String cmdText = cmdType.getCommandName();
+      String cmdDescription = cmdType.getDescription();
+
+      builder.style(AttributedStyle.DEFAULT.bold())
+          .append(cmdText)
+          .append("\t\t")
+          .style(AttributedStyle.DEFAULT)
+          .append(cmdDescription)
+          .append("\n");
+    }
+    writer.println(builder.toAnsi());
   }
 
-  /**
-   * Actually run the shell. Does not return until user choose to exit.
-   */
-  void open(String message) {
-    // Remember we cannot enter alternate screen mode here as there is only 
one alternate
-    // screen and we need it to show streaming results. Clear the screen 
instead.
-    clearScreen();
-    writer.write(CliConstants.WELCOME_MESSAGE);
-    printVersion();
-    if(!CliUtil.isNullOrEmpty(message)) {
-      writer.println(message);
+
+  public boolean handleCommand(CliCommand command) throws 
CommandHandlerException {
+    boolean keepRunning = true;
+
+    if (!command.getCommandType().argsAreOptional() && 
CliUtil.isNullOrEmpty(command.getParameters())) {
+      CliUtil.printCommandUsage(command, writer);
+      return true;
     }
-    writer.println();
 
     try {
-      // Check if jna.jar exists in class path
-      try {
-        
ClassLoader.getSystemClassLoader().loadClass("com.sun.jna.NativeLibrary");
-      } catch (ClassNotFoundException e) {
-        // Something's wrong. It could be a dumb terminal if neither jna nor 
jansi lib is there
-        writer.write("Warning: jna.jar does NOT exist. It may lead to a dumb 
shell or a performance hit.\n");
-      }
+      switch ((CliCommandType) command.getCommandType()) {
+        case CLEAR:
+          commandClear();
+          break;
 
-      while (keepRunning) {
-        String line;
-        try {
-          line = lineReader.readLine(firstPrompt);
-        } catch (UserInterruptException e) {
-          continue;
-        } catch (EndOfFileException e) {
-          commandQuit();
+        case DESCRIBE:
+          commandDescribe(command);
           break;
-        }
 
-        if (CliUtil.isNullOrEmpty(line))
-          continue;
+        case EXECUTE:
+          commandExecuteFile(command);
+          break;
 
-        CliCommand command = parseLine(line);
-        if (command == null)
-          continue;
+        case EXIT:
+        case QUIT:
+          keepRunning = false;
+          break;
+
+        case HELP:
+          commandHelp(command);
+          break;
+
+        case INSERT_INTO:
+          commandInsertInto(command);
+          break;
+
+        case LS:
+          commandLs(command);
+          break;
+
+        case RM:
+          commandRm(command);
+          break;
+
+        case SELECT:
+          commandSelect(command);
+          break;
+
+        case SET:
+          commandSet(command);
+          break;
+
+        case SHOW_FUNCTIONS:
+          commandShowFunctions(command);
+          break;
 
-        try {
-          switch (command.getCommandType()) {
-            case CLEAR:
-              commandClear();
-              break;
-
-            case DESCRIBE:
-              commandDescribe(command);
-              break;
-
-            case EXECUTE:
-              commandExecuteFile(command);
-              break;
-
-            case EXIT:
-            case QUIT:
-              commandQuit();
-              break;
-
-            case HELP:
-              commandHelp(command);
-              break;
-
-            case INSERT_INTO:
-              commandInsertInto(command);
-              break;
-
-            case LS:
-              commandLs(command);
-              break;
-
-            case RM:
-              commandRm(command);
-              break;
-
-            case SELECT:
-              commandSelect(command);
-              break;
-
-            case SET:
-              commandSet(command);
-              break;
-
-            case SHOW_FUNCTIONS:
-              commandShowFunctions(command);
-              break;
-
-            case SHOW_TABLES:
-              commandShowTables(command);
-              break;
-
-            case STOP:
-              commandStop(command);
-              break;
-
-            case VERSION:
-              commandVersion(command);
-              break;
-
-            case INVALID_COMMAND:
-              printHelpMessage();
-              break;
-
-            default:
-              writer.println("UNDER DEVELOPEMENT. Command:" + 
command.getCommandType());
-              writer.println("Parameters:" +
-                      (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" 
: command.getParameters()));
-              writer.flush();
-          }
-        } catch (ExecutorException e) {
-          writer.println("Error: " + e);
-          LOG.error("Error in {}: ", command.getCommandType(), e);
+        case SHOW_TABLES:
+          commandShowTables(command);
+          break;
+
+        case STOP:
+          commandStop(command);
+          break;
+
+        case VERSION:
+          commandVersion();
+          break;
+
+        case INVALID_COMMAND:
+          printHelpMessage();
+          break;
+
+        default:
+          writer.println("UNDER DEVELOPEMENT. Command:" + 
command.getCommandType());
+          writer.println("Parameters:" + 
(CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : 
command.getParameters()));
           writer.flush();
-        }
       }
     } catch (Exception e) {
-      writer.print(e.getClass().getSimpleName());
-      writer.print(". ");
-      writer.println(e.getMessage());
-      e.printStackTrace(writer);
-      writer.println();
-      writer.println("We are sorry but SamzaSqlShell has encountered a problem 
and must stop.");
-    }
-
-    writer.write("Cleaning up... ");
-    writer.flush();
-    try {
-      executor.stop(exeContext);
-      writer.write("Done.\nBye.\n\n");
-      writer.flush();
-      terminal.close();
-    } catch (IOException | ExecutorException e) {
-      // Doesn't matter
+      throw new CommandHandlerException(e);
     }
+    return keepRunning;
   }
 
-  private void commandVersion(CliCommand command) {
+
+  private void commandVersion() {
     printVersion();
   }
 
   private void printVersion() {
     String version = String.format("Shell version %s, Executor is %s, version 
%s",
-            this.getClass().getPackage().getImplementationVersion(),
-            executor.getClass().getName(),
-            executor.getVersion());
+        this.getClass().getPackage().getImplementationVersion(),
+        executor.getClass().getName(),
+        executor.getVersion());
     writer.println(version);
   }
 
@@ -271,12 +221,6 @@ class CliShell {
 
   private void commandDescribe(CliCommand command) throws ExecutorException {
     String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
     SqlSchema schema = executor.getTableSchema(exeContext, parameters);
     List<String> lines = formatSchema4Display(schema);
     for (String line : lines) {
@@ -285,7 +229,7 @@ class CliShell {
     writer.flush();
   }
 
-  private void commandSet(CliCommand command) throws ExecutorException{
+  private void commandSet(CliCommand command) throws Exception {
     String param = command.getParameters();
     if (CliUtil.isNullOrEmpty(param)) {
       env.printAll(writer);
@@ -347,11 +291,6 @@ class CliShell {
   private void commandExecuteFile(CliCommand command) throws ExecutorException{
     String fullCmdStr = command.getFullCommand();
     String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println("Usage: execute <fileuri>");
-      writer.flush();
-      return;
-    }
     URI uri = null;
     boolean valid = false;
     File file = null;
@@ -398,7 +337,7 @@ class CliShell {
   private void commandInsertInto(CliCommand command) throws ExecutorException {
     String fullCmdStr = command.getFullCommand();
     NonQueryResult result = executor.executeNonQuery(exeContext,
-            Collections.singletonList(fullCmdStr));
+        Collections.singletonList(fullCmdStr));
 
     writer.print("Execution submitted successfully. Id: ");
     writer.println(String.valueOf(result.getExecutionId()));
@@ -413,17 +352,7 @@ class CliShell {
     if (CliUtil.isNullOrEmpty(parameters)) {
       execIds.addAll(executions.keySet());
     } else {
-      String[] params = parameters.split("\u0020");
-      for (String param : params) {
-        Integer id = null;
-        try {
-          id = Integer.valueOf(param);
-        } catch (NumberFormatException e) {
-        }
-        if (id != null && executions.containsKey(id)) {
-          execIds.add(id);
-        }
-      }
+      execIds.addAll(splitExecutionIds(parameters));
     }
     if (execIds.size() == 0) {
       return;
@@ -486,29 +415,8 @@ class CliShell {
 
   private void commandRm(CliCommand command) {
     String parameters = command.getParameters();
-    if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
-      return;
-    }
-
     List<Integer> execIds = new ArrayList<>();
-    String[] params = parameters.split("\u0020");
-    for (String param : params) {
-      Integer id = null;
-      try {
-        id = Integer.valueOf(param);
-      } catch (NumberFormatException e) {
-      }
-      if (id == null || !executions.containsKey(id)) {
-        writer.print("Error: ");
-        writer.print(param);
-        writer.println(" is not a valid id.");
-      } else {
-        execIds.add(id);
-      }
-    }
-
+    execIds.addAll(splitExecutionIds(parameters));
     for (Integer id : execIds) {
       try {
         ExecutionStatus status = executor.queryExecutionStatus(id);
@@ -526,14 +434,10 @@ class CliShell {
     writer.flush();
   }
 
-  private void commandQuit() {
-    keepRunning = false;
-  }
-
   private void commandSelect(CliCommand command) throws ExecutorException{
     QueryResult queryResult = executor.executeQuery(exeContext, 
command.getFullCommand());
     CliView view = new QueryResultLogView();
-    view.open(this, queryResult);
+    view.open(shell, queryResult);
     executor.stopExecution(exeContext, queryResult.getExecutionId());
   }
 
@@ -556,8 +460,7 @@ class CliShell {
   private void commandStop(CliCommand command) {
     String parameters = command.getParameters();
     if (CliUtil.isNullOrEmpty(parameters)) {
-      writer.println(command.getCommandType().getUsage());
-      writer.flush();
+      CliUtil.printCommandUsage(command, writer);
       return;
     }
 
@@ -612,54 +515,6 @@ class CliShell {
     writer.flush();
   }
 
-
-  private CliCommand parseLine(String line) {
-    line = trimCommand(line);
-    if (CliUtil.isNullOrEmpty(line))
-      return null;
-
-    String upperCaseLine = line.toUpperCase();
-    for (CliCommandType cmdType : CliCommandType.values()) {
-      String cmdText = cmdType.getCommandName();
-      if (upperCaseLine.startsWith(cmdText)) {
-        if (upperCaseLine.length() == cmdText.length())
-          return new CliCommand(cmdType);
-        else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) 
{
-          String parameter = line.substring(cmdText.length()).trim();
-          if (!parameter.isEmpty())
-            return new CliCommand(cmdType, parameter);
-        }
-      }
-    }
-    return new CliCommand(CliCommandType.INVALID_COMMAND);
-  }
-
-  private void printHelpMessage() {
-    AttributedStringBuilder builder = new AttributedStringBuilder();
-    builder.append("The following commands are supported by ")
-            .append(CliConstants.APP_NAME)
-            .append(" at the moment.\n\n");
-
-    for (CliCommandType cmdType : CliCommandType.values()) {
-      if (cmdType == CliCommandType.INVALID_COMMAND)
-        continue;
-
-      String cmdText = cmdType.getCommandName();
-      String cmdDescription = cmdType.getDescription();
-
-      builder.style(AttributedStyle.DEFAULT.bold())
-              .append(cmdText)
-              .append("\t\t")
-              .style(AttributedStyle.DEFAULT)
-              .append(cmdDescription)
-              .append("\n");
-    }
-
-    writer.println(builder.toAnsi());
-    writer.println("HELP <COMMAND> to get help for a specific command.");
-    writer.flush();
-  }
-
   private void clearScreen() {
     terminal.puts(InfoCmp.Capability.clear_screen);
   }
@@ -739,7 +594,7 @@ class CliShell {
     for (int i = 0; i < rowCount; ++i) {
       SqlSchema.SqlField sqlField = schema.getFields().get(i);
       String field = sqlField.getFieldName();
-      String type = getFieldDisplayValue(sqlField.getFieldSchema());
+      String type = sqlField.getFieldSchema().getFieldType().toString();
       int fieldLen = field.length();
       int typeLen = type.length();
       int fieldStartIdx = 0, typeStartIdx = 0;
@@ -780,49 +635,23 @@ class CliShell {
     return lines;
   }
 
-  private String getFieldDisplayValue(SqlFieldSchema fieldSchema) {
-    if (!isComplexField(fieldSchema.getFieldType())) {
-      return fieldSchema.getFieldType().toString();
-    }
-    SamzaSqlFieldType fieldType = fieldSchema.getFieldType();
-    switch (fieldType) {
-      case ARRAY:
-        return String.format("ARRAY(%s)", 
getFieldDisplayValue(fieldSchema.getElementSchema()));
-      case MAP:
-        return String.format("MAP(%s, %s)", 
SamzaSqlFieldType.STRING.toString(),
-            getFieldDisplayValue(fieldSchema.getValueScehma()));
-      case ROW:
-        String rowDisplayValue = fieldSchema.getRowSchema()
-            .getFields()
-            .stream()
-            .map(f -> getFieldDisplayValue(f.getFieldSchema()))
-            .collect(Collectors.joining(","));
-        return String.format("ROW(%s)", rowDisplayValue);
-      default:
-        throw new UnsupportedOperationException("Unknown field type " + 
fieldType);
-    }
-  }
-
-  private boolean isComplexField(SamzaSqlFieldType fieldtype) {
-    return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == 
SamzaSqlFieldType.MAP
-        || fieldtype == SamzaSqlFieldType.ROW;
-  }
-
-  // Trims: leading spaces; trailing spaces and ";"s
-  private String trimCommand(String command) {
-    if (CliUtil.isNullOrEmpty(command))
-      return command;
-
-    int len = command.length();
-    int st = 0;
-
-    while ((st < len) && (command.charAt(st) <= ' ')) {
-      st++;
-    }
-    while ((st < len) && ((command.charAt(len - 1) <= ' ')
-            || command.charAt(len - 1) == ';')) {
-      len--;
+  private List<Integer> splitExecutionIds(String parameters) {
+    List<Integer> execIds = new ArrayList<>();
+    String[] params = parameters.split("\u0020");
+    for (String param : params) {
+      Integer id = null;
+      try {
+        id = Integer.valueOf(param);
+      } catch (NumberFormatException e) {
+      }
+      if (id == null || !executions.containsKey(id)) {
+        writer.print("Error: ");
+        writer.print(param);
+        writer.println(" is not a valid id.");
+      } else {
+        execIds.add(id);
+      }
     }
-    return ((st > 0) || (len < command.length())) ? command.substring(st, len) 
: command;
+    return execIds;
   }
-}
+}
\ No newline at end of file
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandType.java
similarity index 72%
rename from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
rename to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandType.java
index f153cef..ae0bb90 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliCommandType.java
@@ -17,43 +17,46 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.cli;
+package org.apache.samza.sql.client.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.samza.sql.client.interfaces.CommandType;
 
 /**
  * Enum all the commands we now support along with descriptions.
  */
-enum CliCommandType {
-  SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW 
TABLES <table name>"),
-  SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW 
FUNCTION"),
-  DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>"),
+public enum CliCommandType implements CommandType {
+  SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW 
TABLES", true),
+  SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW 
FUNCTION", true),
+  DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>", 
false),
 
-  SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard 
streaming SQL syntax."),
-  EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>"),
-  INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses 
a standard streaming SQL syntax."),
-  LS("LS", "\tLists all background executions.", "LS [execution ID]"),
-  STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>"),
-  RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution 
ID>"),
+  SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard 
streaming SQL syntax.", false),
+  EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>", 
false),
+  INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses 
a standard streaming SQL syntax.", false),
+  LS("LS", "\tLists all background executions.", "LS [execution ID]", true),
+  STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>", false),
+  RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution 
ID>", false),
 
-  HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]"),
-  SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL"),
-  CLEAR("CLEAR", "\tClears the screen.", "CLEAR"),
-  EXIT("EXIT", "\tExits the shell.", "Exit"),
-  QUIT("QUIT", "\tQuits the shell.", "QUIT"),
-  VERSION("VERSION", "\tShows version information", "VERSION"),
+  HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]", true),
+  SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL", true),
+  CLEAR("CLEAR", "\tClears the screen.", "CLEAR", true),
+  EXIT("EXIT", "\tExits the shell.", "Exit", true),
+  QUIT("QUIT", "\tQuits the shell.", "QUIT", true),
+  VERSION("VERSION", "\tShows version information", "VERSION", true),
 
-  INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND");
+  INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND", 
true);
 
   private final String cmdName;
   private final String description;
   private final String usage;
+  private final boolean argsAreOptional;
 
-  CliCommandType(String cmdName, String description, String usage) {
+  CliCommandType(String cmdName, String description, String usage, boolean 
noArgs) {
     this.cmdName = cmdName;
     this.description = description;
     this.usage = usage;
+    this.argsAreOptional = noArgs;
   }
 
   public static List<String> getAllCommands() {
@@ -76,4 +79,8 @@ enum CliCommandType {
   public String getUsage() {
     return usage;
   }
+
+  public boolean argsAreOptional() {
+    return argsAreOptional;
+  }
 }
\ No newline at end of file
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 181f0f7..2f761f5 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -31,6 +31,8 @@ import org.apache.samza.config.*;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
+import org.apache.samza.sql.client.interfaces.ExecutionStatus;
 import org.apache.samza.sql.client.interfaces.*;
 import org.apache.samza.sql.client.util.Pair;
 import org.apache.samza.sql.client.util.RandomAccessQueue;
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandHandler.java
similarity index 61%
rename from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
rename to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandHandler.java
index b1f08de..ae0d643 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutorException.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandHandler.java
@@ -19,22 +19,16 @@
 
 package org.apache.samza.sql.client.interfaces;
 
-/**
- * An executor shall throw an ExecutorException when it encounters an error.
- */
-public class ExecutorException extends Exception {
-  public ExecutorException() {
-  }
-
-  public ExecutorException(String message) {
-    super(message);
-  }
+import org.apache.samza.sql.client.cli.CliCommand;
+import org.apache.samza.sql.client.cli.CliEnvironment;
+import org.apache.samza.sql.client.cli.CliShell;
+import org.apache.samza.sql.client.exceptions.CommandHandlerException;
+import org.jline.terminal.Terminal;
 
-  public ExecutorException(String message, Throwable cause) {
-    super(message, cause);
-  }
 
-  public ExecutorException(Throwable cause) {
-    super(cause);
-  }
-}
+public interface CommandHandler {
+  void init(CliShell shell, CliEnvironment env, Terminal terminal, 
ExecutionContext exeContext);
+  CliCommand parseLine(String line);
+  boolean handleCommand(CliCommand command) throws CommandHandlerException;
+  void printHelpMessage();
+}
\ No newline at end of file
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandType.java
similarity index 69%
copy from 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
copy to 
samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandType.java
index d647de2..9e42715 100644
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/CommandType.java
@@ -17,14 +17,20 @@
  * under the License.
  */
 
-package org.apache.samza.sql.client.cli;
+package org.apache.samza.sql.client.interfaces;
 
-import org.apache.samza.sql.client.interfaces.ExecutorException;
-import org.apache.samza.sql.client.interfaces.QueryResult;
+import java.util.ArrayList;
+import java.util.List;
 
-/**
- * For displaying the streaming result of a SELECT statement.
- */
-public interface CliView {
-  public void open(CliShell shell, QueryResult queryResult) throws 
ExecutorException;
-}
+public interface CommandType {
+
+  static List<String> getAllCommands() { return new ArrayList<>(); }
+
+  String getCommandName();
+
+  String getDescription();
+
+  String getUsage();
+
+  boolean argsAreOptional();
+}
\ No newline at end of file
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/EnvironmentVariableHandlerImpl.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/EnvironmentVariableHandlerImpl.java
index fbce105..141d8d2 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/EnvironmentVariableHandlerImpl.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/EnvironmentVariableHandlerImpl.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.sql.client.interfaces;
 
-import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.exceptions.CliException;
 import org.apache.samza.sql.client.util.Pair;
 
 import java.util.*;
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
index fb7d109..765451a 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
@@ -22,6 +22,7 @@ package org.apache.samza.sql.client.interfaces;
 
 import java.io.File;
 import java.util.List;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.schema.SqlSchema;
 
 
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
index 03fe1b1..b8bf05c 100755
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.sql.client.util;
 
+import java.io.PrintWriter;
+import org.apache.samza.sql.client.cli.CliCommand;
+
 /**
  * Convenient utility class with static methods.
  */
@@ -40,4 +43,27 @@ public class CliUtil {
     }
     return builder;
   }
+
+  // Trims: leading spaces; trailing spaces and ";"s
+  public static String trimCommand(String command) {
+    if (CliUtil.isNullOrEmpty(command))
+      return command;
+
+    int len = command.length();
+    int st = 0;
+
+    while ((st < len) && (command.charAt(st) <= ' ')) {
+      st++;
+    }
+    while ((st < len) && ((command.charAt(len - 1) <= ' ')
+        || command.charAt(len - 1) == ';')) {
+      len--;
+    }
+    return ((st > 0) || (len < command.length())) ? command.substring(st, len) 
: command;
+  }
+
+  public static void printCommandUsage(CliCommand command, PrintWriter writer) 
{
+    writer.println(command.getCommandType().getUsage());
+    writer.flush();
+  }
 }
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
index 789d64c..a41021d 100644
--- 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
@@ -22,6 +22,8 @@ package org.apache.samza.sql.client.util;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.samza.sql.client.exceptions.CliException;
+
 
 /**
  * A queue that supports random access and consumption.
diff --git 
a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
index e3007f1..ac2d45d 100644
--- 
a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
+++ 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.client.interfaces.EnvironmentVariableHandler;
 import org.apache.samza.sql.client.interfaces.ExecutionContext;
-import org.apache.samza.sql.client.interfaces.ExecutorException;
+import org.apache.samza.sql.client.exceptions.ExecutorException;
 import org.apache.samza.sql.schema.SqlSchema;
 import org.junit.Assert;
 import org.junit.Ignore;

Reply via email to