Repository: samza
Updated Branches:
  refs/heads/1.0.0 ba90a51c8 -> b8e7103be


http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
new file mode 100755
index 0000000..7c2ca32
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.*;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.RandomAccessQueue;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.fn.RegexMatchUdf;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
+import org.apache.samza.tools.avro.AvroSerDeFactory;
+import org.apache.samza.tools.json.JsonRelConverterFactory;
+import org.apache.samza.tools.schemas.ProfileChangeEvent;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+
+/**
+ * Samza implementation of Executor for Samza SQL Shell.
+ */
+public class SamzaExecutor implements SqlExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaExecutor.class);
+
+  private static final String SAMZA_SYSTEM_LOG = "log";
+  private static final String SAMZA_SYSTEM_KAFKA = "kafka";
+  private static final String SAMZA_SQL_OUTPUT = "samza.sql.output";
+  private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = 
"samza.sql.system.kafka.address";
+  private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181";
+
+  // The maximum number of rows of data we keep when user pauses the display 
view and data accumulates.
+  private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
+  private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
+
+  private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
+          new RandomAccessQueue<>(OutgoingMessageEnvelope.class, 
RANDOM_ACCESS_QUEUE_CAPACITY);
+  private static AtomicInteger execIdSeq = new AtomicInteger(0);
+  private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap<>();
+  private String lastErrorMsg = "";
+
+  // -- implementation of SqlExecutor 
------------------------------------------
+
+  @Override
+  public void start(ExecutionContext context) {
+  }
+
+  @Override
+  public void stop(ExecutionContext context) {
+    Iterator<Integer> iter = executions.keySet().iterator();
+    while (iter.hasNext()) {
+      stopExecution(context, iter.next());
+      iter.remove();
+    }
+    outputData.clear();
+  }
+
+  @Override
+  public List<String> listTables(ExecutionContext context) {
+    /**
+     * TODO: currently Shell can only talk to Kafka system, but we should use 
a general way
+     *       to connect to different systems.
+     */
+    lastErrorMsg = "";
+    String address = 
context.getConfigMap().getOrDefault(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS, 
DEFAULT_SERVER_ADDRESS);
+    List<String> tables = null;
+    try {
+      ZkUtils zkUtils = new ZkUtils(new ZkClient(address, 
DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
+          new ZkConnection(address), false);
+      tables = JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
+        .stream()
+        .map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
+        .collect(Collectors.toList());
+    } catch (ZkTimeoutException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+    }
+    return tables;
+  }
+
+  @Override
+  public SqlSchema getTableSchema(ExecutionContext context, String tableName) {
+    /**
+     *  currently Shell works only for systems that has Avro schemas
+     */
+    lastErrorMsg = "";
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    Config samzaSqlConfig = new MapConfig(staticConfigs);
+    SqlSchema sqlSchema = null;
+    try {
+      SqlIOResolver ioResolver = 
SamzaSqlApplicationConfig.createIOResolver(samzaSqlConfig);
+      SqlIOConfig sourceInfo = ioResolver.fetchSourceInfo(tableName);
+      RelSchemaProvider schemaProvider =
+              SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", 
sourceInfo.getRelSchemaProviderName(),
+                      samzaSqlConfig, 
SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
+                      (o, c) -> ((RelSchemaProviderFactory) 
o).create(sourceInfo.getSystemStream(), c));
+      AvroRelSchemaProvider avroSchemaProvider = (AvroRelSchemaProvider) 
schemaProvider;
+      String schema = 
avroSchemaProvider.getSchema(sourceInfo.getSystemStream());
+      sqlSchema = AvroSqlSchemaConverter.convertAvroToSamzaSqlSchema(schema);
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+    }
+    return sqlSchema;
+  }
+
+  @Override
+  public QueryResult executeQuery(ExecutionContext context, String statement) {
+    lastErrorMsg = "";
+    outputData.clear();
+
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    List<String> sqlStmts = 
formatSqlStmts(Collections.singletonList(statement));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    SamzaSqlApplicationRunner runner;
+    try {
+      runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+      runner.run();
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+      return new QueryResult(execId, null, false);
+    }
+    executions.put(execId, runner);
+    LOG.debug("Executing sql. Id ", execId);
+
+    return new QueryResult(execId, generateResultSchema(new 
MapConfig(staticConfigs)), true);
+  }
+
+  @Override
+  public int getRowCount() {
+    return outputData.getSize();
+  }
+
+  @Override
+  public List<String[]> retrieveQueryResult(ExecutionContext context, int 
startRow, int endRow) {
+    List<String[]> results = new ArrayList<>();
+    for (OutgoingMessageEnvelope row : outputData.get(startRow, endRow)) {
+      results.add(getFormattedRow(context, row));
+    }
+    return results;
+  }
+
+  @Override
+  public List<String[]> consumeQueryResult(ExecutionContext context, int 
startRow, int endRow) {
+    List<String[]> results = new ArrayList<>();
+    for (OutgoingMessageEnvelope row : outputData.consume(startRow, endRow)) {
+      results.add(getFormattedRow(context, row));
+    }
+    return results;
+  }
+
+  @Override
+  public NonQueryResult executeNonQuery(ExecutionContext context, File 
sqlFile) {
+    lastErrorMsg = "";
+
+    LOG.info("Sql file path: " + sqlFile.getPath());
+    List<String> executedStmts = new ArrayList<>();
+    try {
+      executedStmts = 
Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
+    } catch (IOException e) {
+      lastErrorMsg = String.format("Unable to parse the sql file %s. %s", 
sqlFile.getPath(), e.toString());
+      LOG.error(lastErrorMsg);
+      return new NonQueryResult(-1, false);
+    }
+    LOG.info("Sql statements in Sql file: " + executedStmts.toString());
+
+    List<String> submittedStmts = new ArrayList<>();
+    List<String> nonSubmittedStmts = new ArrayList<>();
+    validateExecutedStmts(executedStmts, submittedStmts, nonSubmittedStmts);
+    if (submittedStmts.isEmpty()) {
+      lastErrorMsg = "Nothing to execute. Note: SELECT statements are 
ignored.";
+      LOG.warn("Nothing to execute. Statements in the Sql file: {}", 
nonSubmittedStmts);
+      return new NonQueryResult(-1, false);
+    }
+    NonQueryResult result = executeNonQuery(context, submittedStmts);
+    return new NonQueryResult(result.getExecutionId(), result.succeeded(), 
submittedStmts, nonSubmittedStmts);
+  }
+
+  @Override
+  public NonQueryResult executeNonQuery(ExecutionContext context, List<String> 
statement) {
+    lastErrorMsg = "";
+
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(formatSqlStmts(statement)));
+
+    SamzaSqlApplicationRunner runner;
+    try {
+      runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+      runner.run();
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+      return new NonQueryResult(execId, false);
+    }
+    executions.put(execId, runner);
+    LOG.debug("Executing sql. Id ", execId);
+
+    return new NonQueryResult(execId, true);
+  }
+
+  @Override
+  public boolean stopExecution(ExecutionContext context, int exeId) {
+    lastErrorMsg = "";
+
+    SamzaSqlApplicationRunner runner = executions.get(exeId);
+    if (runner != null) {
+      LOG.debug("Stopping execution ", exeId);
+
+      try {
+        runner.kill();
+      } catch (SamzaException ex) {
+        lastErrorMsg = ex.toString();
+        LOG.debug(lastErrorMsg);
+        return false;
+      }
+
+      try {
+        Thread.sleep(500); // wait for a second
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      return true;
+    } else {
+      lastErrorMsg = "Trying to stop a non-existing SQL execution " + exeId;
+      LOG.warn(lastErrorMsg);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean removeExecution(ExecutionContext context, int exeId) {
+    lastErrorMsg = "";
+
+    SamzaSqlApplicationRunner runner = executions.get(exeId);
+    if (runner != null) {
+      if 
(runner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) {
+        lastErrorMsg = "Trying to remove a ongoing execution " + exeId;
+        LOG.error(lastErrorMsg);
+        return false;
+      }
+      executions.remove(exeId);
+      LOG.debug("Stopping execution ", exeId);
+      return true;
+    } else {
+      lastErrorMsg = "Trying to remove a non-existing SQL execution " + exeId;
+      LOG.warn(lastErrorMsg);
+      return false;
+    }
+  }
+
+  @Override
+  public ExecutionStatus queryExecutionStatus(int execId) {
+    SamzaSqlApplicationRunner runner = executions.get(execId);
+    if (runner == null) {
+      return null;
+    }
+    return queryExecutionStatus(runner);
+  }
+
+  @Override
+  public String getErrorMsg() {
+    return lastErrorMsg;
+  }
+
+  @Override
+  public List<SqlFunction> listFunctions(ExecutionContext context) {
+    /**
+     * TODO: currently the Shell only shows some UDFs supported by Samza 
internally. We may need to require UDFs
+     *       to provide a function of getting their "SamzaSqlUdfDisplayInfo", 
then we can get the UDF information from
+     *       SamzaSqlApplicationConfig.udfResolver(or 
SamzaSqlApplicationConfig.udfMetadata) instead of registering
+     *       UDFs one by one as below.
+     */
+    List<SqlFunction> udfs = new ArrayList<>();
+    udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to 
the regex",
+            
Arrays.asList(SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING),
+                    
SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING)),
+            
SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN)));
+
+    return udfs;
+  }
+
+  static void saveOutputMessage(OutgoingMessageEnvelope messageEnvelope) {
+    outputData.add(messageEnvelope);
+  }
+
+  static Map<String, String> fetchSamzaSqlConfig(int execId, ExecutionContext 
executionContext) {
+    HashMap<String, String> staticConfigs = new HashMap<>();
+
+    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId);
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId));
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
+    staticConfigs.put(configIOResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+    String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configUdfResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedUdfResolver.class.getName());
+    staticConfigs.put(configUdfResolverDomain + 
ConfigBasedUdfResolver.CFG_UDF_CLASSES,
+        Joiner.on(",").join(RegexMatchUdf.class.getName(), 
FlattenUdf.class.getName()));
+
+    staticConfigs.put("serializers.registry.string.class", 
StringSerdeFactory.class.getName());
+    staticConfigs.put("serializers.registry.avro.class", 
AvroSerDeFactory.class.getName());
+    staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, 
ProfileChangeEvent.SCHEMA$.toString());
+
+    String kafkaSystemConfigPrefix =
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_KAFKA);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_KAFKA);
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", 
KafkaSystemFactory.class.getName());
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
+    staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", 
"localhost:2181");
+    staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", 
"localhost:9092");
+
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", 
"oldest");
+
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String logSystemConfigPrefix =
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_LOG);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_LOG);
+    staticConfigs.put(logSystemConfigPrefix + "samza.factory", 
CliLoggingSystemFactory.class.getName());
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avroSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
+
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroSchemaGenRelConverterFactory.class.getName());
+
+    String jsonSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"json");
+
+    staticConfigs.put(jsonSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        JsonRelConverterFactory.class.getName());
+
+    String configAvroRelSchemaProviderDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, 
"config");
+    staticConfigs.put(configAvroRelSchemaProviderDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        FileSystemAvroRelSchemaProviderFactory.class.getName());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + 
FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR,
+        "/tmp/schemas/");
+
+    /* TODO: we need to validate and read configurations from 
shell-defaults.conf (aka. "executionContext"),
+     *       and update their value if they've been included in staticConfigs. 
We could handle these logic
+     *       Shell level, or in Executor level.
+     */
+    staticConfigs.putAll(executionContext.getConfigMap());
+
+    return staticConfigs;
+  }
+
+  private List<String> formatSqlStmts(List<String> statements) {
+    return statements.stream().map(sql -> {
+      if (!sql.toLowerCase().startsWith("insert")) {
+        String formattedSql = String.format("insert into log.outputStream %s", 
sql);
+        LOG.debug("Sql formatted. ", sql, formattedSql);
+        return formattedSql;
+      } else {
+        return sql;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private void validateExecutedStmts(List<String> statements, List<String> 
submittedStmts,
+                                     List<String> nonSubmittedStmts) {
+    for (String sql : statements) {
+      if (sql.isEmpty()) {
+        continue;
+      }
+      if (!sql.toLowerCase().startsWith("insert")) {
+        nonSubmittedStmts.add(sql);
+      } else {
+        submittedStmts.add(sql);
+      }
+    }
+  }
+
+  SqlSchema generateResultSchema(Config config) {
+    SamzaSqlDslConverter converter = (SamzaSqlDslConverter) new 
SamzaSqlDslConverterFactory().create(config);
+    RelRoot relRoot = converter.convertDsl("").iterator().next();
+
+    List<String> colNames = new ArrayList<>();
+    List<String> colTypeNames = new ArrayList<>();
+    for (RelDataTypeField dataTypeField : 
relRoot.validatedRowType.getFieldList()) {
+      colNames.add(dataTypeField.getName());
+      colTypeNames.add(dataTypeField.getType().toString());
+    }
+
+    return new SqlSchema(colNames, colTypeNames);
+  }
+
+  private String[] getFormattedRow(ExecutionContext context, 
OutgoingMessageEnvelope row) {
+    String[] formattedRow = new String[1];
+    String outputFormat = context.getConfigMap().get(SAMZA_SQL_OUTPUT);
+    if (outputFormat == null || 
!outputFormat.equalsIgnoreCase(MessageFormat.PRETTY.toString())) {
+      formattedRow[0] = getCompressedFormat(row);
+    } else {
+      formattedRow[0] = getPrettyFormat(row);
+    }
+    return formattedRow;
+  }
+
+  private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner 
runner) {
+    lastErrorMsg = "";
+    switch (runner.status().getStatusCode()) {
+      case New:
+        return ExecutionStatus.New;
+      case Running:
+        return ExecutionStatus.Running;
+      case SuccessfulFinish:
+        return ExecutionStatus.SuccessfulFinish;
+      case UnsuccessfulFinish:
+        return ExecutionStatus.UnsuccessfulFinish;
+      default:
+        lastErrorMsg = String.format("Unsupported execution status %s",
+                runner.status().getStatusCode().toString());
+        return null;
+    }
+  }
+
+  private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
+    String value = new String((byte[]) envelope.getMessage());
+    ObjectMapper mapper = new ObjectMapper();
+    String formattedValue;
+    try {
+      Object json = mapper.readValue(value, Object.class);
+      formattedValue = 
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+    } catch (IOException e) {
+      formattedValue = value;
+      LOG.error("Error while formatting json", e);
+    }
+    return formattedValue;
+  }
+
+  private String getCompressedFormat(OutgoingMessageEnvelope envelope) {
+    return new String((byte[]) envelope.getMessage());
+  }
+
+  private enum MessageFormat {
+    PRETTY,
+    COMPACT
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
new file mode 100644
index 0000000..ed11a53
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+
+/**
+ * Types of Samza Sql fields.
+ */
+public class SamzaSqlFieldType {
+
+  private TypeName typeName;
+  private SamzaSqlFieldType elementType;
+  private SamzaSqlFieldType valueType;
+  private SqlSchema rowSchema;
+
+  private SamzaSqlFieldType(TypeName typeName, SamzaSqlFieldType elementType, 
SamzaSqlFieldType valueType, SqlSchema rowSchema) {
+    this.typeName = typeName;
+    this.elementType = elementType;
+    this.valueType = valueType;
+    this.rowSchema = rowSchema;
+  }
+
+  public static SamzaSqlFieldType createPrimitiveFieldType(TypeName typeName) {
+    return new SamzaSqlFieldType(typeName, null, null, null);
+  }
+
+  public static SamzaSqlFieldType createArrayFieldType(SamzaSqlFieldType 
elementType) {
+    return new SamzaSqlFieldType(TypeName.ARRAY, elementType, null, null);
+  }
+
+  public static SamzaSqlFieldType createMapFieldType(SamzaSqlFieldType 
valueType) {
+    return new SamzaSqlFieldType(TypeName.MAP, null, valueType, null);
+  }
+
+  public static SamzaSqlFieldType createRowFieldType(SqlSchema rowSchema) {
+    return new SamzaSqlFieldType(TypeName.ROW, null, null, rowSchema);
+  }
+
+  public boolean isPrimitiveField() {
+    return typeName != TypeName.ARRAY && typeName != TypeName.MAP && typeName 
!= TypeName.ROW;
+  }
+
+  public TypeName getTypeName() {
+    return typeName;
+  }
+
+  public SamzaSqlFieldType getElementType() {
+    return elementType;
+  }
+
+  public SamzaSqlFieldType getValueType() {
+    return valueType;
+  }
+
+  public SqlSchema getRowSchema() {
+    return rowSchema;
+  }
+
+  public enum TypeName {
+    BYTE, // One-byte signed integer.
+    INT16, // two-byte signed integer.
+    INT32, // four-byte signed integer.
+    INT64, // eight-byte signed integer.
+    DECIMAL, // Decimal integer
+    FLOAT,
+    DOUBLE,
+    STRING, // String.
+    DATETIME, // Date and time.
+    BOOLEAN, // Boolean.
+    BYTES, // Byte array.
+    ARRAY,
+    MAP,
+    ROW, // The field is itself a nested row.
+    ANY
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
new file mode 100644
index 0000000..2e89978
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import org.apache.samza.sql.client.interfaces.SqlFunction;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UDF information displayer
+ */
+public class SamzaSqlUdfDisplayInfo implements SqlFunction {
+
+  private String name;
+
+  private String description;
+
+  private List<SamzaSqlFieldType> argumentTypes;
+
+  private SamzaSqlFieldType returnType;
+
+  public SamzaSqlUdfDisplayInfo(String name, String description, 
List<SamzaSqlFieldType> argumentTypes,
+                                SamzaSqlFieldType returnType) {
+    this.name = name;
+    this.description = description;
+    this.argumentTypes = argumentTypes;
+    this.returnType = returnType;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public List<String> getArgumentTypes() {
+    return argumentTypes.stream().map(x -> 
x.getTypeName().toString()).collect(Collectors.toList());
+  }
+
+  public String getReturnType() {
+    return returnType.getTypeName().toString();
+  }
+
+  public String toString() {
+    List<String> argumentTypeNames =
+            argumentTypes.stream().map(x -> 
x.getTypeName().toString()).collect(Collectors.toList());
+    String args = Joiner.on(", ").join(argumentTypeNames);
+    return String.format("%s(%s) returns <%s> : %s", name, args, 
returnType.getTypeName().toString(), description);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
new file mode 100644
index 0000000..14dfa64
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.Map;
+
+/**
+ * Whenever the shell calls the executor to execute a SQL statement, an object 
of ExecutionContext is passed.
+ */
+public class ExecutionContext {
+  private Map<String, String> m_configs;
+
+  public ExecutionContext(Map<String, String> config) {
+    m_configs = config;
+  }
+
+  /**
+  * @return The Map storing all configuration pairs. Note that the set map is 
the same as the one used by
+  * ExecutionContext, so changes to the map are reflected in ExecutionContext, 
and vice-versa.
+  */
+  public Map<String, String> getConfigMap() {
+    return m_configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
new file mode 100644
index 0000000..5991922
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * An executor shall throw an ExecutionException when it encounters an 
unrecoverable error.
+ */
+public class ExecutionException extends RuntimeException {
+  public ExecutionException() {
+  }
+
+  public ExecutionException(String message) {
+    super(message);
+  }
+
+  public ExecutionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ExecutionException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
new file mode 100644
index 0000000..77ee888
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Status of the execution of a SQL statement.
+ */
+public enum ExecutionStatus {
+  New,
+  Running,
+  SuccessfulFinish,
+  UnsuccessfulFinish
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
new file mode 100644
index 0000000..0173ad1
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Result of a non-query SQL statement or SQL file containing multiple 
non-query statements.
+ */
+public class NonQueryResult {
+  private int execId; // execution ID of the statement(s) submitted
+  private boolean success; // whether the statement(s) submitted successfully
+
+  // When user submits a batch of SQL statements, only the non-query ones will 
be submitted
+  private List<String> submittedStmts;
+  private List<String> nonSubmittedStmts;
+
+  public NonQueryResult(int execId, boolean success) {
+    this.execId = execId;
+    this.success = success;
+  }
+
+  public NonQueryResult(int execId, boolean success, List<String> 
submittedStmts, List<String> nonSubmittedStmts) {
+    this.execId = execId;
+    this.success = success;
+    this.submittedStmts = submittedStmts;
+    this.nonSubmittedStmts = nonSubmittedStmts;
+  }
+
+  public int getExecutionId() {
+    return execId;
+  }
+
+  public boolean succeeded() {
+    return success;
+  }
+
+  public List<String> getSubmittedStmts() {
+    return submittedStmts;
+  }
+
+  public List<String> getNonSubmittedStmts() {
+    return nonSubmittedStmts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
new file mode 100644
index 0000000..6f54557
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Execution result of a SELECT statement. It doesn't contain data though.
+ */
+public class QueryResult {
+  private int execId; // execution ID of the statement(s) submitted
+  private boolean success; // whether the statement(s) submitted successfully
+  private SqlSchema schema; // The schema of the data coming from the query
+
+  public QueryResult(int execId, SqlSchema schema, Boolean success) {
+    if (success && schema == null)
+      throw new IllegalArgumentException();
+    this.execId = execId;
+    this.schema = schema;
+    this.success = success;
+  }
+
+  public int getExecutionId() {
+    return execId;
+  }
+
+  public SqlSchema getSchema() {
+    return schema;
+  }
+
+  public boolean succeeded() {
+    return success;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
new file mode 100644
index 0000000..9d528f6
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.io.File;
+import java.util.List;
+
+
+/**
+ * Conventions:
+ * <p>
+ * Implementations shall report UNRECOVERABLE EXCEPTIONS by throwing
+ * ExecutionExceptions, though SqlExecutor doesn't enforce this by as we don't 
believe in
+ * Java checked exceptions. Report errors by returning values as indicated by 
each
+ * function and preparing for the subsequent getErrorMsg call.
+ * <p>
+ * Each execution (both query and non-query shall return an non-negative 
execution ID(execId).
+ * Negative execution IDs are reserved for error handling.
+ * <p>
+ * User shall be able to query the status of an execution even after it's 
finished, so the
+ * executor shall keep record of all the execution unless being asked to 
remove them (
+ * when removeExecution is called.)
+ * <p>
+ * IMPORTANT: An executor shall support two ways of supplying data:
+ * 1. Say user selects profiles of users who visited LinkedIn in the last 5 
mins. There could
+ * be millions of rows, but the UI only need to show a small fraction. That's 
retrieveQueryResult,
+ * accepting a row range (startRow and endRow). Note that UI may ask for the 
same data over and over,
+ * like when user switches from page 1 to page 2 and data stream changes at 
the same time, the two
+ * pages may actually have overlapped or even same data.
+ * <p>
+ * 2. Say user wants to see clicks on a LinkedIn page of certain person from 
now on. In this mode
+ * consumeQueryResult shall be used. UI can keep asking for new rows, and once 
the rows are consumed,
+ * it's no longer necessary for the executor to keep them. If lots of rows 
come in, the UI may be only
+ * interested in the last certain rows (as it's in a logview mode), so all 
data older can be dropped.
+ */
+public interface SqlExecutor {
+  /**
+   * SqlExecutor shall be ready to accept all other calls after start() is 
called.
+   * However, it shall NOT store the ExecutionContext for future use, as each
+   * call will be given an ExecutionContext which may differ from this one.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   */
+  public void start(ExecutionContext context);
+
+  /**
+   * Indicates no further calls will be made thus it's safe for the executor 
to clean up.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   */
+  public void stop(ExecutionContext context);
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+   * an empty list indicates no tables found.
+   */
+  public List<String> listTables(ExecutionContext context);
+
+  /**
+   * @param context   The ExecutionContext at the time of the call.
+   * @param tableName Name of the table to get the schema for.
+   * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+   */
+  public SqlSchema getTableSchema(ExecutionContext context, String tableName);
+
+  /**
+   * @param context   The ExecutionContext at the time of the call.
+   * @param statement statement to execute
+   * @return The query result.
+   */
+  public QueryResult executeQuery(ExecutionContext context, String statement);
+
+
+  /**
+   * @return how many rows available for reading.
+   */
+  public int getRowCount();
+
+  /**
+   * Row starts at 0. Executor shall keep the data retrieved.
+   * For now we get strings for display but we might want strong typed values.
+   *
+   * @param context  The ExecutionContext at the time of the call.
+   * @param startRow Start row index (inclusive)
+   * @param endRow   End row index (inclusive)
+   * @return A list of row data represented by a String array.
+   */
+  public List<String[]> retrieveQueryResult(ExecutionContext context, int 
startRow, int endRow);
+
+
+  /**
+   * Consumes rows from query result. Executor shall drop them, as "consume" 
indicates.
+   * ALL data before endRow (inclusive, including data before startRow) shall 
be deleted.
+   *
+   * @param context  The ExecutionContext at the time of the call.
+   * @param startRow Start row index (inclusive)
+   * @param endRow   End row index (inclusive)
+   * @return available data between startRow and endRow (both are inclusive)
+   */
+  public List<String[]> consumeQueryResult(ExecutionContext context, int 
startRow, int endRow);
+
+  /**
+   * Executes all the NON-QUERY statements in the sqlFile.
+   * Query statements are ignored as it won't make sense.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   * @param file    A File object to read statements from.
+   * @return Execution result.
+   */
+  public NonQueryResult executeNonQuery(ExecutionContext context, File file);
+
+  /**
+   * @param context    The ExecutionContext at the time of the call.
+   * @param statements A list of non-query sql statements.
+   * @return Execution result.
+   */
+  public NonQueryResult executeNonQuery(ExecutionContext context, List<String> 
statements);
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @param exeId   Execution ID.
+   * @return Whether the operation suceeded or not.
+   */
+  public boolean stopExecution(ExecutionContext context, int exeId);
+
+
+  /**
+   * Removing an ongoing execution shall result in an error. Stop it first.
+   *
+   * @param context The ExecutionContext at the time of the call
+   * @param exeId   Execution ID.
+   * @return Whether the operation succeeded or not.
+   */
+  public boolean removeExecution(ExecutionContext context, int exeId);
+
+  /**
+   * @param execId Execution ID.
+   * @return ExecutionStatus.
+   */
+  public ExecutionStatus queryExecutionStatus(int execId);
+
+  /**
+   * @return The last error message of last function call.
+   */
+  public String getErrorMsg();
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @return A list of SqlFunction.
+   */
+  List<SqlFunction> listFunctions(ExecutionContext context);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
new file mode 100644
index 0000000..3ac602c
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Represents a SQL function.
+ */
+public interface SqlFunction {
+  /**
+   * Gets the name of the function.
+   * @return name of the function
+   */
+  public String getName();
+
+  /**
+   * Gets the description of the function.
+   * @return description of the function.
+   */
+  public String getDescription();
+
+  /**
+   * Gets the argument types of the function as a List.
+   * @return A list containing the type names of the arguments.
+   */
+  public List<String> getArgumentTypes();
+
+  /**
+   * Gets the return type of the function.
+   * @return return type name
+   */
+  public String getReturnType();
+
+  /**
+   * Don't forget to implement toString()
+   */
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
new file mode 100644
index 0000000..7dcabdb
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.util.List;
+
+/**
+ * A primitive representation of SQL schema which is just for display purpose.
+ */
+public class SqlSchema {
+  private String[] names; // field names
+  private String[] typeNames; // names of field type
+
+  public SqlSchema(List<String> colNames, List<String> colTypeNames) {
+    if (colNames == null || colNames.size() == 0
+            || colTypeNames == null || colTypeNames.size() == 0
+            || colNames.size() != colTypeNames.size())
+      throw new IllegalArgumentException();
+
+    names = new String[colNames.size()];
+    names = colNames.toArray(names);
+
+    typeNames = new String[colTypeNames.size()];
+    typeNames = colTypeNames.toArray(typeNames);
+  }
+
+  public int getFieldCount() {
+    return names.length;
+  }
+
+  public String getFieldName(int colIdx) {
+    return names[colIdx];
+  }
+
+  public String getFieldTypeName(int colIdx) {
+    return typeNames[colIdx];
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
new file mode 100644
index 0000000..84e8a0e
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Convenient class for building a SqlSchema object.
+ */
+public class SqlSchemaBuilder {
+  private List<String> names = new ArrayList<>();
+  private List<String> typeNames = new ArrayList<>();
+
+  private SqlSchemaBuilder() {
+  }
+
+  public static SqlSchemaBuilder builder() {
+    return new SqlSchemaBuilder();
+  }
+
+  public SqlSchemaBuilder addField(String name, String fieldType) {
+    if (name == null || name.isEmpty() || fieldType == null)
+      throw new IllegalArgumentException();
+
+    names.add(name);
+    typeNames.add(fieldType);
+    return this;
+  }
+
+  public SqlSchemaBuilder appendFields(List<String> names, List<String> 
typeNames) {
+    if (names == null || names.size() == 0
+            || typeNames == null || typeNames.size() == 0
+            || names.size() != typeNames.size())
+      throw new IllegalArgumentException();
+
+    this.names.addAll(names);
+    this.typeNames.addAll(typeNames);
+
+    return this;
+  }
+
+  public SqlSchema toSchema() {
+    return new SqlSchema(names, typeNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
new file mode 100755
index 0000000..743ff44
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * The exception used by the shell for unrecoverable errors.
+ */
+public class CliException extends RuntimeException {
+  public CliException() {
+
+  }
+
+  public CliException(String message) {
+    super(message);
+  }
+
+  public CliException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public CliException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
new file mode 100755
index 0000000..03fe1b1
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * Convenient utility class with static methods.
+ */
+public class CliUtil {
+  public static boolean isNullOrEmpty(String str) {
+    return str == null || str.isEmpty();
+  }
+
+  public static int ceilingDiv(int x, int y) {
+    if (x < 0 || y <= 0)
+      throw new IllegalArgumentException();
+
+    return x / y + (x % y == 0 ? 0 : 1);
+  }
+
+  public static StringBuilder appendTo(StringBuilder builder, int toPos, char 
c) {
+    for (int i = builder.length(); i <= toPos; ++i) {
+      builder.append(c);
+    }
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
new file mode 100644
index 0000000..789d64c
--- /dev/null
+++ 
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A queue that supports random access and consumption.
+ * @param <T> Element type
+ */
+public class RandomAccessQueue<T> {
+  private T[] buffer;
+  private int capacity;
+  private int size;
+  private int head;
+
+  public RandomAccessQueue(Class<T> t, int capacity) {
+    this.capacity = capacity;
+    head = 0;
+    size = 0;
+
+    @SuppressWarnings("unchecked") final T[] b = (T[]) Array.newInstance(t, 
capacity);
+    buffer = b;
+  }
+
+  public synchronized List<T> get(int start, int end) {
+    int lowerBound = Math.max(start, 0);
+    int upperBound = Math.min(end, size - 1);
+    List<T> rets = new ArrayList<>();
+    for (int i = lowerBound; i <= upperBound; i++) {
+      rets.add(buffer[(head + i) % capacity]);
+    }
+    return rets;
+  }
+
+  public synchronized T get(int index) {
+    if (index >= 0 && index < size) {
+      return buffer[(head + index) % capacity];
+    }
+    throw new CliException("OutOfBoundaryError");
+  }
+
+  public synchronized void add(T t) {
+    if (size >= capacity) {
+      buffer[head] = t;
+      head = (head + 1) % capacity;
+    } else {
+      int pos = (head + size) % capacity;
+      buffer[pos] = t;
+      size++;
+    }
+  }
+
+  /*
+   * Remove all element before 'end', and return elements between 'start' and 
'end'
+   */
+  public synchronized List<T> consume(int start, int end) {
+    List<T> rets = get(start, end);
+    int upperBound = Math.min(end, size - 1);
+    head = (end + 1) % capacity;
+    size -= (upperBound + 1);
+    return rets;
+  }
+
+  public synchronized int getHead() {
+    return head;
+  }
+
+  public synchronized int getSize() {
+    return size;
+  }
+
+  public synchronized void clear() {
+    head = 0;
+    size = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
new file mode 100644
index 0000000..18fe4b7
--- /dev/null
+++ 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.sql.client.impl.SamzaExecutor.*;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
+
+public class SamzaExecutorTest {
+    private SamzaExecutor m_executor = new SamzaExecutor();
+
+    @Test
+    public void testGetTableSchema() {
+        ExecutionContext context = getExecutionContext();
+        SqlSchema ts = m_executor.getTableSchema(context, 
"kafka.ProfileChangeStream");
+
+        Assert.assertEquals("Name", ts.getFieldName(0));
+        Assert.assertEquals("NewCompany", ts.getFieldName(1));
+        Assert.assertEquals("OldCompany", ts.getFieldName(2));
+        Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(3));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(0));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(1));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(2));
+        Assert.assertEquals("INT64", ts.getFieldTypeName(3));
+    }
+
+    @Test
+    public void testGenerateResultSchema() {
+        ExecutionContext context = getExecutionContext();
+        Map<String, String> mapConf = fetchSamzaSqlConfig(1, context);
+        SqlSchema ts = m_executor.generateResultSchema(new MapConfig(mapConf));
+
+        Assert.assertEquals("__key__", ts.getFieldName(0));
+        Assert.assertEquals("Name", ts.getFieldName(1));
+        Assert.assertEquals("NewCompany", ts.getFieldName(2));
+        Assert.assertEquals("OldCompany", ts.getFieldName(3));
+        Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));
+        Assert.assertEquals("BIGINT", ts.getFieldTypeName(4));
+    }
+
+    private ExecutionContext getExecutionContext() {
+        ClassLoader classLoader = getClass().getClassLoader();
+        File file = new 
File(classLoader.getResource("ProfileChangeStream.avsc").getFile());
+        Map<String, String> mapConf = new HashMap<>();
+        mapConf.put("samza.sql.relSchemaProvider.config.schemaDir", 
file.getParent());
+        mapConf.put(CFG_SQL_STMT, "insert into log.outputStream select * from 
kafka.ProfileChangeStream");
+        return new ExecutionContext(mapConf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
new file mode 100644
index 0000000..fe7a19a
--- /dev/null
+++ 
b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class RandomAccessQueueTest {
+  private RandomAccessQueue m_queue;
+  public RandomAccessQueueTest() {
+    m_queue = new RandomAccessQueue<>(Integer.class, 5);
+  }
+
+  @Test
+  public void testAddAndGetElement() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i);
+    }
+    Assert.assertEquals(0, m_queue.getHead());
+    Assert.assertEquals(4, m_queue.getSize());
+    Assert.assertEquals(0, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(3));
+
+    for (int i = 0; i < 3; i++) {
+      m_queue.add(4 + i);
+    }
+    int head = m_queue.getHead();
+    Assert.assertEquals(2, head);
+    Assert.assertEquals(5, m_queue.getSize());
+    Assert.assertEquals(2, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(1));
+    Assert.assertEquals(4, m_queue.get(2));
+    Assert.assertEquals(5, m_queue.get(3));
+    Assert.assertEquals(6, m_queue.get(4));
+  }
+
+  @Test
+  public void testGetRange() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i); // 0, 1, 2, 3
+    }
+    List<Integer> rets = m_queue.get(-1, 9);
+    Assert.assertEquals(4, rets.size());
+    Assert.assertEquals(0, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(3));
+
+    for (int i = 0; i <= 2; i++) {
+      m_queue.add(4 + i);
+    }
+    rets = m_queue.get(0, 4);
+    Assert.assertEquals(2, rets.get(0).intValue());
+    Assert.assertEquals(3, rets.get(1).intValue());
+    Assert.assertEquals(4, rets.get(2).intValue());
+    Assert.assertEquals(5, rets.get(3).intValue());
+    Assert.assertEquals(6, rets.get(4).intValue());
+  }
+
+  @Test
+  public void testConsume() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i); // 0, 1, 2, 3
+    }
+    List<Integer> rets = m_queue.consume(1, 2);
+    Assert.assertEquals(1, m_queue.getSize());
+    Assert.assertEquals(3, m_queue.getHead());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc 
b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
new file mode 100644
index 0000000..5c1e49d
--- /dev/null
+++ b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "ProfileChangeEvent",
+    "version" : 1,
+    "namespace": "com.linkedin.samza.tools.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "Name",
+            "doc": "Name of the profile.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "NewCompany",
+            "doc": "Name of the new company the person joined.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "OldCompany",
+            "doc": "Name of the old company the person was working.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "ProfileChangeTimestamp",
+            "doc": "Time at which the profile was changed.",
+            "type": ["null", "long"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 17df373..745c934 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -143,7 +143,7 @@ public class SamzaSqlApplicationConfig {
     });
   }
 
-  private static <T> T initializePlugin(String pluginName, String plugin, 
Config staticConfig,
+  public static <T> T initializePlugin(String pluginName, String plugin, 
Config staticConfig,
       String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) 
{
     String pluginDomain = String.format(pluginDomainFormat, plugin);
     Config pluginConfig = staticConfig.subset(pluginDomain);

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 853a7bb..5b9df5c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,6 +27,7 @@ include \
   'samza-rest',
   'samza-shell',
   'samza-sql',
+  'samza-sql-shell',
   'samza-tools'
 
 def scalaModules = [

Reply via email to