http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index a0bd45f..8e6f687 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -31,9 +31,12 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,37 +49,37 @@ class ProjectTranslator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ProjectTranslator.class);
 
-  void translate(final Project project, final TranslatorContext context) {
-    MessageStream<SamzaSqlRelMessage> messageStream = 
context.getMessageStream(project.getInput().getId());
-    List<Integer> flattenProjects =
-        
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
+  private static class ProjectMapFunction implements 
MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+    private transient Project project;
+    private transient Expression expr;
+    private transient TranslatorContext context;
 
-    if (flattenProjects.size() > 0) {
-      if (flattenProjects.size() > 1) {
-        String msg = "Multiple flatten operators in a single query is not 
supported";
-        LOG.error(msg);
-        throw new SamzaException(msg);
-      }
+    private final int projectId;
 
-      messageStream = translateFlatten(flattenProjects.get(0), messageStream);
+    ProjectMapFunction(int projectId) {
+      this.projectId = projectId;
     }
 
-    Expression expr = 
context.getExpressionCompiler().compile(project.getInputs(), 
project.getProjects());
+    @Override
+    public void init(Config config, TaskContext taskContext) {
+      this.context = (TranslatorContext) taskContext.getUserContext();
+      this.project = (Project) this.context.getRelNode(projectId);
+      this.expr = 
this.context.getExpressionCompiler().compile(project.getInputs(), 
project.getProjects());
+    }
 
-    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> {
+    @Override
+    public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
       expr.execute(context.getExecutionContext(), context.getDataContext(),
-          m.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+          message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
       List<String> names = new ArrayList<>();
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
       }
 
       return new SamzaSqlRelMessage(names, Arrays.asList(output));
-    });
-
-    context.registerMessageStream(project.getId(), outputStream);
+    }
   }
 
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer 
flattenIndex,
@@ -106,4 +109,27 @@ class ProjectTranslator {
   private Integer getProjectIndex(RexNode rexNode) {
     return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
   }
+
+  void translate(final Project project, final TranslatorContext context) {
+    MessageStream<SamzaSqlRelMessage> messageStream = 
context.getMessageStream(project.getInput().getId());
+    List<Integer> flattenProjects =
+        
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
+
+    if (flattenProjects.size() > 0) {
+      if (flattenProjects.size() > 1) {
+        String msg = "Multiple flatten operators in a single query is not 
supported";
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+      messageStream = translateFlatten(flattenProjects.get(0), messageStream);
+    }
+
+    final int projectId = project.getId();
+
+    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new 
ProjectMapFunction(projectId));
+
+    context.registerMessageStream(project.getId(), outputStream);
+    context.registerRelNode(project.getId(), project);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index eda73a7..1db3000 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.translator;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.calcite.rel.RelNode;
@@ -29,11 +30,14 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
@@ -43,6 +47,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,11 +63,33 @@ public class QueryTranslator {
 
   private final ScanTranslator scanTranslator;
   private final SamzaSqlApplicationConfig sqlConfig;
+  private final Map<String, SamzaRelConverter> converters;
+
+  private static class OutputMapFunction implements 
MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
+    private transient SamzaRelConverter samzaMsgConverter;
+    private final String outputTopic;
+
+    OutputMapFunction(String outputTopic) {
+      this.outputTopic = outputTopic;
+    }
+
+    @Override
+    public void init(Config config, TaskContext taskContext) {
+      TranslatorContext context = (TranslatorContext) 
taskContext.getUserContext();
+      this.samzaMsgConverter = context.getMsgConverter(outputTopic);
+    }
+
+    @Override
+    public KV<Object, Object> apply(SamzaSqlRelMessage message) {
+      return this.samzaMsgConverter.convertToSamzaMessage(message);
+    }
+  }
 
   public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
     this.sqlConfig = sqlConfig;
     scanTranslator =
         new ScanTranslator(sqlConfig.getSamzaRelConverters(), 
sqlConfig.getInputSystemStreamConfigBySource());
+    this.converters = sqlConfig.getSamzaRelConverters();
   }
 
   public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph 
streamGraph) {
@@ -71,7 +98,7 @@ public class QueryTranslator {
             sqlConfig.getUdfMetadata());
     final SamzaSqlExecutionContext executionContext = new 
SamzaSqlExecutionContext(this.sqlConfig);
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
-    final TranslatorContext context = new TranslatorContext(streamGraph, 
relRoot, executionContext);
+    final TranslatorContext context = new TranslatorContext(streamGraph, 
relRoot, executionContext, this.converters);
     final RelNode node = relRoot.project();
     final SqlIOResolver ioResolver = 
context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
 
@@ -119,9 +146,8 @@ public class QueryTranslator {
 
     String sink = queryInfo.getSink();
     SqlIOConfig sinkConfig = 
sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
-    SamzaRelConverter samzaMsgConverter = 
sqlConfig.getSamzaRelConverters().get(queryInfo.getSink());
     MessageStreamImpl<SamzaSqlRelMessage> stream = 
(MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
-    MessageStream<KV<Object, Object>> outputStream = 
stream.map(samzaMsgConverter::convertToSamzaMessage);
+    MessageStream<KV<Object, Object>> outputStream = stream.map(new 
OutputMapFunction(sink));
 
     Optional<TableDescriptor> tableDescriptor = 
sinkConfig.getTableDescriptor();
     if (!tableDescriptor.isPresent()) {
@@ -135,5 +161,19 @@ public class QueryTranslator {
       }
       outputStream.sendTo(outputTable);
     }
+
+    streamGraph.withContextManager(new ContextManager() {
+      @Override
+      public void init(Config config, TaskContext taskContext) {
+        taskContext.setUserContext(context.clone());
+      }
+
+      @Override
+      public void close() {
+
+      }
+
+    });
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
index df88a7c..889ea97 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -44,20 +44,20 @@ public class SamzaSqlRelMessageJoinFunction
 
   private final JoinRelType joinRelType;
   private final boolean isTablePosOnRight;
-  private final List<Integer> streamFieldIds;
+  private final ArrayList<Integer> streamFieldIds;
   // Table field names are used in the outer join when the table record is not 
found.
-  private final List<String> tableFieldNames;
-  private final List<String> outFieldNames;
+  private final ArrayList<String> tableFieldNames;
+  private final ArrayList<String> outFieldNames;
 
-  public SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean 
isTablePosOnRight,
+  SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean 
isTablePosOnRight,
       List<Integer> streamFieldIds, List<String> streamFieldNames, 
List<String> tableFieldNames) {
     this.joinRelType = joinRelType;
     this.isTablePosOnRight = isTablePosOnRight;
     Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && 
isTablePosOnRight) ||
         (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) 
||
         joinRelType.compareTo(JoinRelType.INNER) == 0);
-    this.streamFieldIds = streamFieldIds;
-    this.tableFieldNames = tableFieldNames;
+    this.streamFieldIds = new ArrayList<>(streamFieldIds);
+    this.tableFieldNames = new ArrayList<>(tableFieldNames);
     this.outFieldNames = new ArrayList<>();
     if (isTablePosOnRight) {
       outFieldNames.addAll(streamFieldNames);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 1f9ed52..fa3d9d3 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -23,11 +23,14 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 
 
@@ -45,17 +48,36 @@ class ScanTranslator {
     this.systemStreamConfig = ssc;
   }
 
+  private static class ScanMapFunction implements MapFunction<KV<Object, 
Object>, SamzaSqlRelMessage> {
+    private transient SamzaRelConverter msgConverter;
+    private final String streamName;
+
+    ScanMapFunction(String sourceStreamName) {
+      this.streamName = sourceStreamName;
+    }
+
+    @Override
+    public void init(Config config, TaskContext taskContext) {
+      TranslatorContext context = (TranslatorContext) 
taskContext.getUserContext();
+      this.msgConverter = context.getMsgConverter(streamName);
+    }
+
+    @Override
+    public SamzaSqlRelMessage apply(KV<Object, Object> message) {
+      return this.msgConverter.convertToRelMessage(message);
+    }
+  }
+
   void translate(final TableScan tableScan, final TranslatorContext context) {
     StreamGraph streamGraph = context.getStreamGraph();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
     String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
 
     Validate.isTrue(relMsgConverters.containsKey(sourceName), 
String.format("Unknown source %s", sourceName));
-    SamzaRelConverter converter = relMsgConverters.get(sourceName);
-    String streamName = systemStreamConfig.get(sourceName).getStreamName();
+    final String streamName = 
systemStreamConfig.get(sourceName).getStreamName();
 
     MessageStream<KV<Object, Object>> inputStream = 
streamGraph.getInputStream(streamName);
-    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = 
inputStream.map(converter::convertToRelMessage);
+    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = 
inputStream.map(new ScanMapFunction(sourceName));
 
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index fd5195b..7a25efb 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -35,16 +36,25 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
 
 
 /**
  * State that is maintained while translating the Calcite relational graph to 
Samza {@link StreamGraph}.
  */
-public class TranslatorContext {
+public class TranslatorContext implements Cloneable {
+  /**
+   * The internal variables that are shared among all cloned {@link 
TranslatorContext}
+   */
   private final StreamGraph streamGraph;
-  private final Map<Integer, MessageStream> messsageStreams = new HashMap<>();
   private final RexToJavaCompiler compiler;
+  private final Map<String, SamzaRelConverter> relSamzaConverters;
+  private final Map<Integer, MessageStream> messsageStreams;
+  private final Map<Integer, RelNode> relNodes;
 
+  /**
+   * The internal variables that are not shared among all cloned {@link 
TranslatorContext}
+   */
   private final SamzaSqlExecutionContext executionContext;
   private final DataContextImpl dataContext;
 
@@ -90,17 +100,42 @@ public class TranslatorContext {
     }
   }
 
+  private RexToJavaCompiler createExpressionCompiler(RelRoot relRoot) {
+    RelDataTypeFactory dataTypeFactory = 
relRoot.project().getCluster().getTypeFactory();
+    RexBuilder rexBuilder = new SamzaSqlRexBuilder(dataTypeFactory);
+    return new RexToJavaCompiler(rexBuilder);
+  }
+
+  /**
+   * Private constructor to make a clone of {@link TranslatorContext} object
+   *
+   * @param other the original object to copy from
+   */
+  private TranslatorContext(TranslatorContext other) {
+    this.streamGraph  = other.streamGraph;
+    this.compiler = other.compiler;
+    this.relSamzaConverters = other.relSamzaConverters;
+    this.messsageStreams = other.messsageStreams;
+    this.relNodes = other.relNodes;
+    this.executionContext = other.executionContext.clone();
+    this.dataContext = new DataContextImpl();
+  }
+
   /**
    * Create the instance of TranslatorContext
    * @param streamGraph Samza's streamGraph that is populated during the 
translation.
    * @param relRoot Root of the relational graph from calcite.
    * @param executionContext the execution context
+   * @param converters the map of schema to RelData converters
    */
-  public TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, 
SamzaSqlExecutionContext executionContext) {
+  TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, 
SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> 
converters) {
     this.streamGraph = streamGraph;
     this.compiler = createExpressionCompiler(relRoot);
     this.executionContext = executionContext;
     this.dataContext = new DataContextImpl();
+    this.relSamzaConverters = converters;
+    this.messsageStreams = new HashMap<>();
+    this.relNodes = new HashMap<>();
   }
 
   /**
@@ -112,22 +147,16 @@ public class TranslatorContext {
     return streamGraph;
   }
 
-  private RexToJavaCompiler createExpressionCompiler(RelRoot relRoot) {
-    RelDataTypeFactory dataTypeFactory = 
relRoot.project().getCluster().getTypeFactory();
-    RexBuilder rexBuilder = new SamzaSqlRexBuilder(dataTypeFactory);
-    return new RexToJavaCompiler(rexBuilder);
-  }
-
   /**
    * Gets execution context.
    *
    * @return the execution context
    */
-  public SamzaSqlExecutionContext getExecutionContext() {
+  SamzaSqlExecutionContext getExecutionContext() {
     return executionContext;
   }
 
-  public DataContext getDataContext() {
+  DataContext getDataContext() {
     return dataContext;
   }
 
@@ -136,7 +165,7 @@ public class TranslatorContext {
    *
    * @return the expression compiler
    */
-  public RexToJavaCompiler getExpressionCompiler() {
+  RexToJavaCompiler getExpressionCompiler() {
     return compiler;
   }
 
@@ -146,7 +175,7 @@ public class TranslatorContext {
    * @param id the id
    * @param stream the stream
    */
-  public void registerMessageStream(int id, MessageStream stream) {
+  void registerMessageStream(int id, MessageStream stream) {
     messsageStreams.put(id, stream);
   }
 
@@ -156,7 +185,29 @@ public class TranslatorContext {
    * @param id the id
    * @return the message stream
    */
-  public MessageStream getMessageStream(int id) {
+  MessageStream getMessageStream(int id) {
     return messsageStreams.get(id);
   }
+
+  void registerRelNode(int id, RelNode relNode) {
+    relNodes.put(id, relNode);
+  }
+
+  RelNode getRelNode(int id) {
+    return relNodes.get(id);
+  }
+
+  SamzaRelConverter getMsgConverter(String source) {
+    return this.relSamzaConverters.get(source);
+  }
+
+  /**
+   * This method helps to create a per task instance of translator context
+   *
+   * @return the cloned instance of {@link TranslatorContext}
+   */
+  @Override
+  public TranslatorContext clone() {
+    return new TranslatorContext(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
deleted file mode 100644
index de0ecf1..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.translator.QueryTranslator;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestQueryTranslator {
-
-  private final Map<String, String> configs = new HashMap<>();
-
-  @Before
-  public void setUp() {
-    configs.put("job.default.system", "kafka");
-  }
-
-  @Test
-  public void testTranslate() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from 
testavro.level1.level2.SIMPLE1 as s where s.id = 10");
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals(1, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("SIMPLE1",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-  }
-
-  @Test
-  public void testTranslateComplex() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(array_values) from 
testavro.COMPLEX1");
-//    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-//        "Insert into testavro.foo2 select string_value, SUM(id) from 
testavro.COMPLEX1 "
-//            + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), 
string_value");
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals(1, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-  }
-
-  @Test
-  public void testTranslateSubQuery() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(a), id from (select 
id, array_values a, string_value s from testavro.COMPLEX1)");
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals(1, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithoutJoinOperator() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
-            + " where p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithFullJoinOperator() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " full join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = IllegalStateException.class)
-  public void testTranslateStreamTableJoinWithSelfJoinOperator() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p1.name as profileName"
-            + " from testavro.PROFILE.`$table` as p1"
-            + " join testavro.PROFILE.`$table` as p2"
-            + " on p1.id = p2.id";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithThetaCondition() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id <> pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableCrossJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithAndLiteralCondition() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId and p.name = 'John'";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithSubQuery() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " where exists "
-            + " (select p.id from testavro.PROFILE.`$table` as p"
-            + " where p.id = pv.profileId)";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateTableTableJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW.`$table` as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamStreamJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateJoinWithIncorrectLeftJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW.`$table` as pv"
-            + " left join testavro.PROFILE as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateJoinWithIncorrectRightJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " right join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableInnerJoinWithMissingStream() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String configIOResolverDomain =
-        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedIOResolverFactory.class.getName());
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableInnerJoinWithUdf() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on MyTest(p.id) = MyTest(pv.profileId)";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-
-  @Test
-  public void testTranslateStreamTableInnerJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-
-    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic", 
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-
-    Assert.assertEquals(3, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
-  }
-
-  @Test
-  public void testTranslateStreamTableLeftJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " left join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-
-    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-
-    Assert.assertEquals(3, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
-  }
-
-  @Test
-  public void testTranslateStreamTableRightJoin() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PROFILE.`$table` as p"
-            + " right join testavro.PAGEVIEW as pv"
-            + " on p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-
-    Assert.assertEquals(2, streamGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        
streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-
-    Assert.assertEquals(3, streamGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
-  }
-
-  @Test
-  public void testTranslateGroupBy() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.pageViewCountTopic"
-            + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
-            + " from testavro.PAGEVIEW as pv"
-            + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
-            + " group by (pv.pageKey)";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-
-    Assert.assertEquals(1, streamGraph.getInputOperators().size());
-    Assert.assertEquals(1, streamGraph.getOutputStreams().size());
-    Assert.assertTrue(streamGraph.hasWindowOrJoins());
-    Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();
-  }
-
-  @Test (expected = SamzaException.class)
-  public void testTranslateGroupByWithSumAggregator() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.pageViewCountTopic"
-            + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) 
as `sum`"
-            + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or 
pv.pageKey = 'inbox'"
-            + " group by (pv.pageKey)";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
-    translator.translate(queryInfo, streamGraph);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
deleted file mode 100644
index 0d48c56..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlApplicationConfig {
-
-  @Test
-  public void testConfigInit() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into 
testavro.COMPLEX1 select * from testavro.SIMPLE1");
-    String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
-    int numUdfs = config.get(configUdfResolverDomain + 
ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length;
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
-    Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size());
-    Assert.assertEquals(numUdfs, 
samzaSqlApplicationConfig.getUdfMetadata().size());
-    Assert.assertEquals(1, 
samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
-    Assert.assertEquals(1, 
samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
-  }
-
-  @Test
-  public void testWrongConfigs() {
-
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-
-
-    try {
-      // Fail because no SQL config
-      new SamzaSqlApplicationConfig(new MapConfig(config));
-      Assert.fail();
-    } catch (SamzaException e) {
-    }
-
-    // Pass
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into 
testavro.COMPLEX1 select * from testavro.SIMPLE1");
-    new SamzaSqlApplicationConfig(new MapConfig(config));
-    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
-    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
-
-    String configIOResolverDomain =
-        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "testavro");
-
-    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
-
-    // Configs for the unused system "log" is not mandatory.
-    String logSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "log");
-    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
-  }
-
-  private void testWithoutConfigShouldPass(Map<String, String> config, String 
configKey) {
-    Map<String, String> badConfigs = new HashMap<>(config);
-    badConfigs.remove(configKey);
-    new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
-  }
-
-  private void testWithoutConfigShouldFail(Map<String, String> config, String 
configKey) {
-    Map<String, String> badConfigs = new HashMap<>(config);
-    badConfigs.remove(configKey);
-    try {
-      new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
-      Assert.fail();
-    } catch (IllegalArgumentException e) {
-      // swallow
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java
deleted file mode 100644
index e42b55d..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.Map;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.junit.Assert;
-
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.junit.Test;
-
-
-public class TestSamzaSqlApplicationRunner {
-
-  @Test
-  public void testComputeSamzaConfigs() {
-    Map<String, String> configs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as 
long_value from testavro.SIMPLE1";
-    configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
-    configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, 
SamzaSqlApplicationRunner.class.getName());
-    MapConfig samzaConfig = new MapConfig(configs);
-    Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
samzaConfig);
-    
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), 
LocalApplicationRunner.class.getName());
-    // Check whether three new configs added.
-    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
-
-    newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, 
samzaConfig);
-    
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), 
RemoteApplicationRunner.class.getName());
-
-    // Check whether three new configs added.
-    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
deleted file mode 100644
index 5bac472..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-
-import org.apache.samza.sql.testutil.SqlFileParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlFileParser {
-
-  public static final String TEST_SQL =
-      "insert into log.outputStream \n" + "\tselect * from 
brooklin.elasticsearchEnterpriseAccounts\n"
-          + "insert into log.outputstream select sfdcAccountId as key, 
organizationUrn as name2, "
-          + "description as name3 from 
brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream 
\n"
-          + "insert into log.outputstream \n" + "\n" + "\tselect id, 
MyTest(id) as id2 \n" + "\n"
-          + "\tfrom tracking.SamzaSqlTestTopic1_p8";
-
-  @Test
-  public void testParseSqlFile() throws IOException {
-    File tempFile = File.createTempFile("testparser", "");
-    PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
-    fileWriter.println(TEST_SQL);
-    fileWriter.close();
-
-    List<String> sqlStmts = 
SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
-    Assert.assertEquals(3, sqlStmts.size());
-    Assert.assertEquals("insert into log.outputStream select * from 
brooklin.elasticsearchEnterpriseAccounts",
-        sqlStmts.get(0));
-    Assert.assertEquals(
-        "insert into log.outputstream select sfdcAccountId as key, 
organizationUrn as name2, description as name3 from 
brooklin.elasticsearchEnterpriseAccounts",
-        sqlStmts.get(1));
-    Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as 
id2 from tracking.SamzaSqlTestTopic1_p8",
-        sqlStmts.get(2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
deleted file mode 100644
index 24faf4b..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class TestSamzaSqlQueryParser {
-
-  @Test
-  public void testParseQuery() {
-    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo 
select * from tracking.bar");
-    Assert.assertEquals("log.foo", queryInfo.getSink());
-    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from 
tracking.bar", queryInfo.getSelectQuery());
-    Assert.assertEquals(1, queryInfo.getSources().size());
-    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
-  }
-
-  @Test
-  public void testParseJoinQuery() {
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
-    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
-    Assert.assertEquals(2, queryInfo.getSources().size());
-    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
-    Assert.assertEquals("testavro.PROFILE.$table", 
queryInfo.getSources().get(1));
-  }
-
-  @Test
-  public void testParseInvalidQuery() {
-
-    try {
-      SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into log.off select from 
tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
deleted file mode 100644
index 689af72..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlRelMessage {
-
-  private List<Object> values = Arrays.asList("value1", "value2");
-  private List<String> names = Arrays.asList("field1", "field2");
-
-  @Test
-  public void testGetField() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    Assert.assertEquals(values.get(0), 
message.getSamzaSqlRelRecord().getField(names.get(0)).get());
-    Assert.assertEquals(values.get(1), 
message.getSamzaSqlRelRecord().getField(names.get(1)).get());
-  }
-
-  @Test
-  public void testGetNonExistentField() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    
Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
deleted file mode 100644
index 90fce3b..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.samza.operators.KV;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.translator.SamzaSqlRelMessageJoinFunction;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlRelMessageJoinFunction {
-
-  private List<String> streamFieldNames = Arrays.asList("field1", "field2", 
"field3", "field4");
-  private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, 
"value4");
-  private List<String> tableFieldNames = Arrays.asList("field11", "field12", 
"field13", "field14");
-  private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, 
"value5");
-
-  @Test
-  public void testWithInnerJoinWithTableOnRight() {
-    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, 
streamFieldValues);
-    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, 
tableFieldValues);
-    JoinRelType joinRelType = JoinRelType.INNER;
-    List<Integer> streamKeyIds = Arrays.asList(0, 1);
-    List<Integer> tableKeyIds = Arrays.asList(0, 1);
-    SamzaSqlCompositeKey compositeKey = 
SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
-    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, 
tableMsg);
-
-    SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, 
streamFieldNames, tableFieldNames);
-    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
-
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
-        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
-    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
-    expectedFieldNames.addAll(tableFieldNames);
-    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
-    expectedFieldValues.addAll(tableFieldValues);
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
-  }
-
-  @Test
-  public void testWithInnerJoinWithTableOnLeft() {
-    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, 
streamFieldValues);
-    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, 
tableFieldValues);
-    JoinRelType joinRelType = JoinRelType.INNER;
-    List<Integer> streamKeyIds = Arrays.asList(0, 2);
-    List<Integer> tableKeyIds = Arrays.asList(0, 2);
-    SamzaSqlCompositeKey compositeKey = 
SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
-    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, 
tableMsg);
-
-    SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, 
streamFieldNames, tableFieldNames);
-    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
-
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
-        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
-    List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
-    expectedFieldNames.addAll(streamFieldNames);
-    List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
-    expectedFieldValues.addAll(streamFieldValues);
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
-  }
-
-  @Test
-  public void testNullRecordWithInnerJoin() {
-    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, 
streamFieldValues);
-    JoinRelType joinRelType = JoinRelType.INNER;
-    List<Integer> streamKeyIds = Arrays.asList(0, 1);
-
-    SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, 
streamFieldNames, tableFieldNames);
-    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
-    Assert.assertNull(outMsg);
-  }
-
-  @Test
-  public void testNullRecordWithLeftOuterJoin() {
-    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, 
streamFieldValues);
-    JoinRelType joinRelType = JoinRelType.LEFT;
-    List<Integer> streamKeyIds = Arrays.asList(0, 1);
-
-    SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, 
streamFieldNames,
-            tableFieldNames);
-    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
-
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
-        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
-    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
-    expectedFieldNames.addAll(tableFieldNames);
-    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
-    expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null 
).collect(Collectors.toList()));
-    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java 
b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
new file mode 100644
index 0000000..93e6223
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessage {
+
+  private List<Object> values = Arrays.asList("value1", "value2");
+  private List<String> names = Arrays.asList("field1", "field2");
+
+  @Test
+  public void testGetField() {
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    Assert.assertEquals(values.get(0), 
message.getSamzaSqlRelRecord().getField(names.get(0)).get());
+    Assert.assertEquals(values.get(1), 
message.getSamzaSqlRelRecord().getField(names.get(1)).get());
+  }
+
+  @Test
+  public void testGetNonExistentField() {
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    
Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
new file mode 100644
index 0000000..dac5d02
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.runner;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlApplicationConfig {
+
+  @Test
+  public void testConfigInit() {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into 
testavro.COMPLEX1 select * from testavro.SIMPLE1");
+    String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    int numUdfs = config.get(configUdfResolverDomain + 
ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length;
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+    Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size());
+    Assert.assertEquals(numUdfs, 
samzaSqlApplicationConfig.getUdfMetadata().size());
+    Assert.assertEquals(1, 
samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
+    Assert.assertEquals(1, 
samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
+  }
+
+  @Test
+  public void testWrongConfigs() {
+
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+
+
+    try {
+      // Fail because no SQL config
+      new SamzaSqlApplicationConfig(new MapConfig(config));
+      Assert.fail();
+    } catch (SamzaException e) {
+    }
+
+    // Pass
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into 
testavro.COMPLEX1 select * from testavro.SIMPLE1");
+    new SamzaSqlApplicationConfig(new MapConfig(config));
+    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
+    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
+
+    String configIOResolverDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "testavro");
+
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
+
+    // Configs for the unused system "log" is not mandatory.
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "log");
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
+  }
+
+  private void testWithoutConfigShouldPass(Map<String, String> config, String 
configKey) {
+    Map<String, String> badConfigs = new HashMap<>(config);
+    badConfigs.remove(configKey);
+    new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+  }
+
+  private void testWithoutConfigShouldFail(Map<String, String> config, String 
configKey) {
+    Map<String, String> badConfigs = new HashMap<>(config);
+    badConfigs.remove(configKey);
+    try {
+      new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      // swallow
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
new file mode 100644
index 0000000..b6dcac5
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.runner;
+
+import java.util.Map;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.RemoteApplicationRunner;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
+
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.junit.Test;
+
+
+public class TestSamzaSqlApplicationRunner {
+
+  @Test
+  public void testComputeSamzaConfigs() {
+    Map<String, String> configs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as 
long_value from testavro.SIMPLE1";
+    configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
+    configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, 
SamzaSqlApplicationRunner.class.getName());
+    MapConfig samzaConfig = new MapConfig(configs);
+    Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
samzaConfig);
+    
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), 
LocalApplicationRunner.class.getName());
+    // Check whether three new configs added.
+    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+
+    newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, 
samzaConfig);
+    
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), 
RemoteApplicationRunner.class.getName());
+
+    // Check whether three new configs added.
+    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
new file mode 100644
index 0000000..a84f347
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlFileParser {
+
+  public static final String TEST_SQL =
+      "insert into log.outputStream \n" + "\tselect * from 
brooklin.elasticsearchEnterpriseAccounts\n"
+          + "insert into log.outputstream select sfdcAccountId as key, 
organizationUrn as name2, "
+          + "description as name3 from 
brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream 
\n"
+          + "insert into log.outputstream \n" + "\n" + "\tselect id, 
MyTest(id) as id2 \n" + "\n"
+          + "\tfrom tracking.SamzaSqlTestTopic1_p8";
+
+  @Test
+  public void testParseSqlFile() throws IOException {
+    File tempFile = File.createTempFile("testparser", "");
+    PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
+    fileWriter.println(TEST_SQL);
+    fileWriter.close();
+
+    List<String> sqlStmts = 
SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
+    Assert.assertEquals(3, sqlStmts.size());
+    Assert.assertEquals("insert into log.outputStream select * from 
brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(0));
+    Assert.assertEquals(
+        "insert into log.outputstream select sfdcAccountId as key, 
organizationUrn as name2, description as name3 from 
brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(1));
+    Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as 
id2 from tracking.SamzaSqlTestTopic1_p8",
+        sqlStmts.get(2));
+  }
+}

Reply via email to