This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 97afd3f  SAMZA-2313: Adding validation for Samza Sql statements. 
(#1148)
97afd3f is described below

commit 97afd3fa902c5fa9919fb3f930531eca84642269
Author: Aditya Toomula <[email protected]>
AuthorDate: Tue Sep 3 15:23:18 2019 -0700

    SAMZA-2313: Adding validation for Samza Sql statements. (#1148)
    
    * Adding validation for Samza Sql statements.
    
    * Adding validation for Samza Sql statements.
    
    * Adding validation for Samza Sql statements.
    
    * Adding validation for Samza Sql statements.
    
    * Adding validation for Samza Sql statements.
    
    * Adding validation for Samza Sql statements.
---
 .../apache/samza/sql/dsl/SamzaSqlDslConverter.java |  49 +++-
 .../org/apache/samza/sql/planner/QueryPlanner.java |  98 ++++----
 .../samza/sql/planner/SamzaSqlValidator.java       | 279 +++++++++++++++++++++
 .../sql/planner/SamzaSqlValidatorException.java    |  40 +++
 .../sql/runner/SamzaSqlApplicationConfig.java      |   4 +
 .../apache/samza/sql/util/SamzaSqlQueryParser.java |   6 +-
 .../samza/sql/planner/TestSamzaSqlValidator.java   | 181 +++++++++++++
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 255 +++++++++++++++----
 .../test/samzasql/TestSamzaSqlRemoteTable.java     |  67 +++--
 9 files changed, 853 insertions(+), 126 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index ea0ebfa..b09d3d6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -52,31 +52,58 @@ public class SamzaSqlDslConverter implements DslConverter {
   public Collection<RelRoot> convertDsl(String dsl) {
     // TODO: Introduce an API to parse a dsl string and return one or more sql 
statements
     List<String> sqlStmts = fetchSqlFromConfig(config);
-    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
-    SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toList()),
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
-
-    QueryPlanner planner =
-        new QueryPlanner(sqlConfig.getRelSchemaProviders(), 
sqlConfig.getInputSystemStreamConfigBySource(),
-            sqlConfig.getUdfMetadata());
-
+    QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
     List<RelRoot> relRoots = new LinkedList<>();
     for (String sql: sqlStmts) {
       // we always pass only select query to the planner for samza sql. The 
reason is that samza sql supports
       // schema evolution where source and destination could up to an extent 
have independent schema evolution while
       // calcite expects strict comformance of the destination schema with 
that of the fields in the select query.
       SamzaSqlQueryParser.QueryInfo qinfo = 
SamzaSqlQueryParser.parseQuery(sql);
-      relRoots.add(planner.plan(qinfo.getSelectQuery()));
+      RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
+      relRoots.add(relRoot);
     }
     return relRoots;
   }
 
+  /**
+   * Get {@link SamzaSqlApplicationConfig} given sql statements and samza 
config.
+   * @param sqlStmts List of sql statements
+   * @param config Samza config
+   * @return {@link SamzaSqlApplicationConfig}
+   */
+  public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts, 
Config config) {
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    return new SamzaSqlApplicationConfig(config,
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
+            .collect(Collectors.toList()));
+  }
+
+  /**
+   * Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig}
+   * @param sqlConfig {@link SamzaSqlApplicationConfig}
+   * @return {@link QueryPlanner}
+   */
+  public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig 
sqlConfig) {
+    return new QueryPlanner(sqlConfig.getRelSchemaProviders(), 
sqlConfig.getInputSystemStreamConfigBySource(),
+        sqlConfig.getUdfMetadata());
+  }
+
+  /**
+   * Get list of {@link 
org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql 
statements.
+   * @param sqlStmts list of sql statements
+   * @return list of {@link 
org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo}
+   */
   public static List<SamzaSqlQueryParser.QueryInfo> 
fetchQueryInfo(List<String> sqlStmts) {
     return 
sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
   }
 
+  /**
+   * Get list of sql statements based on the property set in the config.
+   * @param config config
+   * @return list of Sql statements
+   */
   public static List<String> fetchSqlFromConfig(Map<String, String> config) {
     List<String> sql;
     if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index bbf1770..bdf03f7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -36,9 +36,6 @@ import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
-import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
@@ -46,9 +43,7 @@ import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
@@ -87,49 +82,38 @@ public class QueryPlanner {
     this.udfMetadata = udfMetadata;
   }
 
+  private void registerSourceSchemas(SchemaPlus rootSchema) {
+    RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
+
+    for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
+      SchemaPlus previousLevelSchema = rootSchema;
+      List<String> sourceParts = ssc.getSourceParts();
+      RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
+
+      for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
+        String sourcePart = sourceParts.get(sourcePartIndex);
+        if (sourcePartIndex < sourceParts.size() - 1) {
+          SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
+          if (sourcePartSchema == null) {
+            sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
+          }
+          previousLevelSchema = sourcePartSchema;
+        } else {
+          // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
+          RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, 
relSchemaConverter);
+          previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
+          break;
+        }
+      }
+    }
+  }
+
   public RelRoot plan(String query) {
     try {
       Connection connection = DriverManager.getConnection("jdbc:calcite:");
       CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
-
-      for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
-        SchemaPlus previousLevelSchema = rootSchema;
-        List<String> sourceParts = ssc.getSourceParts();
-        RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
-
-        for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
-          String sourcePart = sourceParts.get(sourcePartIndex);
-          if (sourcePartIndex < sourceParts.size() - 1) {
-            SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
-            if (sourcePartSchema == null) {
-              sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
-            }
-            previousLevelSchema = sourcePartSchema;
-          } else {
-            // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
-            SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-
-            List<String> fieldNames = new ArrayList<>();
-            List<SqlFieldSchema> fieldTypes = new ArrayList<>();
-            if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
-              fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-              
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
-            }
-
-            fieldNames.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
-            fieldTypes.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
-
-            SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
-            RelDataType relationalSchema = 
relSchemaConverter.convertToRelSchema(newSchema);
-            previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
-            break;
-          }
-        }
-      }
+      registerSourceSchemas(rootSchema);
 
       List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
           .map(x -> new SamzaSqlScalarFunctionImpl(x))
@@ -162,12 +146,34 @@ public class QueryPlanner {
       LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel));
       return relRoot;
     } catch (Exception e) {
-      LOG.error("Query planner failed with exception.", e);
-      throw new SamzaException(e);
+      String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
+      LOG.error(errorMsg, e);
+      throw new SamzaException(errorMsg, e);
+    }
+  }
+
+  public static RelDataType getSourceRelSchema(RelSchemaProvider 
relSchemaProvider,
+      RelSchemaConverter relSchemaConverter) {
+    // If the source part is the last one, then fetch the schema corresponding 
to the stream and register.
+    SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
+
+    List<String> fieldNames = new ArrayList<>();
+    List<SqlFieldSchema> fieldTypes = new ArrayList<>();
+    if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
+      fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
+      
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
     }
+
+    fieldNames.addAll(
+        
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
+    fieldTypes.addAll(
+        
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
+
+    SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
+    return relSchemaConverter.convertToRelSchema(newSchema);
   }
 
-  private Table createTableFromRelSchema(RelDataType relationalSchema) {
+  private static Table createTableFromRelSchema(RelDataType relationalSchema) {
     return new AbstractTable() {
       public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         return relationalSchema;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
new file mode 100644
index 0000000..08d4497
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SamzaSqlValidator that uses calcite engine to convert the sql query to 
relational graph and validates the query
+ * including the output.
+ */
+public class SamzaSqlValidator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaSqlValidator.class);
+
+  private final Config config;
+
+  public SamzaSqlValidator(Config config) {
+    this.config = config;
+  }
+
+  /**
+   * Validate a list of sql statements
+   * @param sqlStmts list of sql statements
+   * @throws SamzaSqlValidatorException
+   */
+  public void validate(List<String> sqlStmts) throws 
SamzaSqlValidatorException {
+    SamzaSqlApplicationConfig sqlConfig = 
SamzaSqlDslConverter.getSqlConfig(sqlStmts, config);
+    QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
+
+    for (String sql: sqlStmts) {
+      // we always pass only select query to the planner for samza sql. The 
reason is that samza sql supports
+      // schema evolution where source and destination could up to an extent 
have independent schema evolution while
+      // calcite expects strict conformance of the destination schema with 
that of the fields in the select query.
+      SamzaSqlQueryParser.QueryInfo qinfo = 
SamzaSqlQueryParser.parseQuery(sql);
+      RelRoot relRoot;
+      try {
+        relRoot = planner.plan(qinfo.getSelectQuery());
+      } catch (SamzaException e) {
+        throw new SamzaSqlValidatorException("Calcite planning for sql 
failed.", e);
+      }
+
+      // Now that we have logical plan, validate different aspects.
+      validate(relRoot, qinfo, sqlConfig);
+    }
+  }
+
+  protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo 
qinfo, SamzaSqlApplicationConfig sqlConfig)
+      throws SamzaSqlValidatorException {
+    // Validate select fields (including Udf return types) with output schema
+    validateOutput(relRoot, 
sqlConfig.getRelSchemaProviders().get(qinfo.getSink()));
+
+    // TODO:
+    //  1. SAMZA-2314: Validate Udf arguments.
+    //  2. SAMZA-2315: Validate operators. These are the operators that are 
supported by Calcite but not by Samza Sql.
+    //     Eg: LogicalAggregate with sum function is not supported by Samza 
Sql.
+  }
+
+  protected void validateOutput(RelRoot relRoot, RelSchemaProvider 
relSchemaProvider) throws SamzaSqlValidatorException {
+    RelRecordType outputRecord = (RelRecordType) 
QueryPlanner.getSourceRelSchema(relSchemaProvider,
+        new RelSchemaConverter());
+    LogicalProject project = (LogicalProject) relRoot.rel;
+    RelRecordType projetRecord = (RelRecordType) project.getRowType();
+    validateOutputRecords(outputRecord, projetRecord);
+  }
+
+  protected void validateOutputRecords(RelRecordType outputRecord, 
RelRecordType projectRecord)
+      throws SamzaSqlValidatorException {
+    Map<String, RelDataType> outputRecordMap = 
outputRecord.getFieldList().stream().collect(
+        Collectors.toMap(RelDataTypeField::getName, 
RelDataTypeField::getType));
+    Map<String, RelDataType> projectRecordMap = 
projectRecord.getFieldList().stream().collect(
+        Collectors.toMap(RelDataTypeField::getName, 
RelDataTypeField::getType));
+
+    // There could be default values for the output schema and hence fields in 
project schema could be a subset of
+    // fields in output schema.
+    // TODO: SAMZA-2316: Validate that all non-default value fields in output 
schema are set in the projected fields.
+    for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
+      RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
+      if (outputFieldType == null) {
+        if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
+          continue;
+        }
+        String errMsg = String.format("Field '%s' in select query does not 
match any field in output schema.",
+            entry.getKey());
+        LOG.error(errMsg);
+        throw new SamzaSqlValidatorException(errMsg);
+      } else if (!compareFieldTypes(outputFieldType, entry.getValue())) {
+        String errMsg = String.format("Field '%s' with type '%s' in select 
query does not match the field type '%s' in"
+            + " output schema.", entry.getKey(), entry.getValue(), 
outputFieldType);
+        LOG.error(errMsg);
+        throw new SamzaSqlValidatorException(errMsg);
+      }
+    }
+  }
+
+  protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType 
selectQueryFieldType) {
+    RelDataType projectFieldType;
+
+    // JavaTypes are relevant for Udf argument and return types
+    // TODO: Support UDF argument validation. Currently, only return types are 
validated and argument types are
+    //  validated during run-time.
+    if (selectQueryFieldType instanceof RelDataTypeFactoryImpl.JavaType) {
+      projectFieldType = new 
SamzaSqlJavaTypeFactoryImpl().toSql(selectQueryFieldType);
+    } else {
+      projectFieldType = selectQueryFieldType;
+    }
+
+    SqlTypeName outputSqlType = outputFieldType.getSqlTypeName();
+    SqlTypeName projectSqlType = projectFieldType.getSqlTypeName();
+
+    if (projectSqlType == SqlTypeName.ANY || outputSqlType == SqlTypeName.ANY) 
{
+      return true;
+    } else if (outputSqlType != SqlTypeName.ROW && outputSqlType == 
projectSqlType) {
+      return true;
+    }
+
+    switch (outputSqlType) {
+      case CHAR:
+        return projectSqlType == SqlTypeName.VARCHAR;
+      case VARCHAR:
+        return projectSqlType == SqlTypeName.CHAR;
+      case BIGINT:
+        return projectSqlType == SqlTypeName.INTEGER;
+      case INTEGER:
+        return projectSqlType == SqlTypeName.BIGINT;
+      case FLOAT:
+        return projectSqlType == SqlTypeName.DOUBLE;
+      case DOUBLE:
+        return projectSqlType == SqlTypeName.FLOAT;
+      case ROW:
+        try {
+          validateOutputRecords((RelRecordType) outputFieldType, 
(RelRecordType) projectFieldType);
+        } catch (SamzaSqlValidatorException e) {
+          LOG.error("A field in select query does not match with the output 
schema.", e);
+          return false;
+        }
+        return true;
+      default:
+          return false;
+    }
+  }
+
+  // -- All Static Methods below --
+
+  /**
+   * Format the Calcite exception to a more readable form.
+   *
+   * As an example, consider the below sql query which fails calcite 
validation due to a non existing field :
+   * "Insert into testavro.outputTopic(id) select non_existing_name, name as 
string_value"
+   *             + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"
+   *
+   * This function takes in the above multi-line sql query and the below 
sample exception as input:
+   * "org.apache.calcite.runtime.CalciteContextException: From line 1, column 
8 to line 1, column 26: Column
+   * 'non_existing_name' not found in any table"
+   *
+   * And returns the following string:
+   * 2019-08-30 09:05:08 ERROR QueryPlanner:174 - Failed with exception for 
the following sql statement:
+   *
+   * Sql syntax error:
+   *
+   * SELECT `non_existing_name`, `name` AS `string_value`
+   * -------^^^^^^^^^^^^^^^^^^^--------------------------
+   * FROM `testavro`.`level1`.`level2`.`SIMPLE1` AS `s`
+   * WHERE `s`.`id` = 1
+   *
+   * @param query sql query
+   * @param e Exception returned by Calcite
+   * @return formatted error string
+   */
+  public static String formatErrorString(String query, Exception e) {
+    Pattern pattern = Pattern.compile("line [0-9]+, column [0-9]+");
+    Matcher matcher = pattern.matcher(e.getMessage());
+    String[] queryLines = query.split("\\n");
+    StringBuilder result = new StringBuilder();
+    int startColIdx, endColIdx, startLineIdx, endLineIdx;
+
+    try {
+      if (matcher.find()) {
+        String match = matcher.group();
+        LOG.info(match);
+        startLineIdx = getIdxFromString(match, "line ");
+        startColIdx = getIdxFromString(match, "column ");
+        if (matcher.find()) {
+          match = matcher.group();
+          LOG.info(match);
+          endLineIdx = getIdxFromString(match, "line ");
+          endColIdx = getIdxFromString(match, "column ");
+        } else {
+          endColIdx = startColIdx;
+          endLineIdx = startLineIdx;
+        }
+        int lineLen = endLineIdx - startLineIdx;
+        int colLen = endColIdx - startColIdx + 1;
+
+        // Error spanning across multiple lines is not supported yet.
+        if (lineLen > 0) {
+          throw new SamzaException("lineLen formatting validation error: error 
cannot span across multiple lines.");
+        }
+
+        int lineIdx = 0;
+        for (String line : queryLines) {
+          result.append(line)
+              .append("\n");
+          if (lineIdx == startLineIdx) {
+            String lineStr = getStringWithRepeatedChars('-', line.length() - 
1);
+            String pointerStr = getStringWithRepeatedChars('^', colLen);
+            String errorMarkerStr =
+                new StringBuilder(lineStr).replace(startColIdx, endColIdx, 
pointerStr).toString();
+            result.append(errorMarkerStr)
+                .append("\n");
+          }
+          lineIdx++;
+        }
+      }
+
+      String[] errorMsgParts = e.getMessage().split("Exception:");
+      result.append("\n")
+          .append(errorMsgParts[errorMsgParts.length - 1].trim());
+      return String.format("Sql syntax error:\n\n%s\n",
+          result);
+    } catch (Exception ex) {
+      // Ignore any formatting errors.
+      LOG.error("Formatting error (Not the actual error. Look for the logs for 
actual error)", ex);
+      return String.format("Failed with formatting exception (not the actual 
error) for the following sql"
+              + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
+    }
+  }
+
+  private static int getIdxFromString(String inputString, String delimiterStr) 
{
+    String[] splitStr = inputString.split(delimiterStr);
+    Scanner in = new Scanner(splitStr[1]).useDelimiter("[^0-9]+");
+    return in.nextInt() - 1;
+  }
+
+  private static String getStringWithRepeatedChars(char ch, int len) {
+    char[] chars = new char[len];
+    Arrays.fill(chars, ch);
+    return new String(chars);
+  }
+}
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
new file mode 100644
index 0000000..812508f
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+/**
+ * Checked Exception thrown while validating SQL statement.
+ */
+public class SamzaSqlValidatorException extends Exception {
+  public SamzaSqlValidatorException() {
+  }
+
+  public SamzaSqlValidatorException(String message) {
+    super(message);
+  }
+
+  public SamzaSqlValidatorException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SamzaSqlValidatorException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 07aa6bb..c494382 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -308,6 +308,10 @@ public class SamzaSqlApplicationConfig {
     return outputSystemStreamConfigsBySource;
   }
 
+  public SqlIOConfig getOutputSqlIOConfig(String source) {
+    return outputSystemStreamConfigsBySource.get(source);
+  }
+
   public Map<String, SamzaRelConverter> getSamzaRelConverters() {
     return samzaRelConvertersBySource;
   }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java 
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index d2ed991..630d3f3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,6 +53,8 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.interfaces.SamzaSqlDriver;
 import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.apache.samza.sql.planner.QueryPlanner;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +104,9 @@ public class SamzaSqlQueryParser {
     try {
       sqlNode = planner.parse(sql);
     } catch (SqlParseException e) {
-      throw new SamzaException(e);
+      String errorMsg = SamzaSqlValidator.formatErrorString(sql, e);
+      LOG.error(errorMsg, e);
+      throw new SamzaException(errorMsg, e);
     }
 
     String sink;
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
new file mode 100644
index 0000000..b2ce6f6
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -0,0 +1,181 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+
+
+public class TestSamzaSqlValidator {
+
+  private final Map<String, String> configs = new HashMap<>();
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestSamzaSqlValidator.class);
+
+  @Before
+  public void setUp() {
+    configs.put("job.default.system", "kafka");
+  }
+
+  @Test
+  public void testBasicValidation() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select id, name as string_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaSqlValidatorException.class)
+  public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select id, name as strings_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testNonExistingSelectField() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select non_existing_field, name 
as string_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+    SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+  }
+
+  @Test(expected = SamzaSqlValidatorException.class)
+  public void testCalciteErrorString() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select non_existing_field, name 
as string_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+
+    try {
+      SamzaSqlApplicationRunner.computeSamzaConfigs(true, new 
MapConfig(config));
+    } catch (SamzaException e) {
+      Assert.assertTrue(e.getMessage().contains("line 1, column 8 to line 1, 
column 27: Column 'non_existing_field' not found"));
+      throw new SamzaSqlValidatorException("Calcite planning for sql failed.", 
e);
+    }
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testNonExistingUdf() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select NonExistingUdf(name) as 
string_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaSqlValidatorException.class)
+  public void testSelectAndOutputValidationFailure() throws 
SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic(id) select name as long_value"
+            + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testValidationStreamTableLeftJoinWithWhere() throws 
SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey) 
select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv left join 
testavro.PROFILE.`$table` as p where p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testUnsupportedOperator() throws SamzaSqlValidatorException {
+    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)"
+            + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+            + " from testavro.PAGEVIEW as pv"
+            + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+            + " group bys (pv.pageKey)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+  }
+
+  @Test
+  public void testFormatErrorString() {
+    String sql =
+        "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
+            + "from testavro.PAGEVIEW as pv\n"
+            + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n"
+            + "group bys (pv.pageKey)";
+    String errorStr =
+        "org.apache.calcite.tools.ValidationException: 
org.apache.calcite.runtime.CalciteContextException: "
+            + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey' 
not found in any table";
+    String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new 
Exception(errorStr));
+    LOG.info(formattedErrStr);
+  }
+
+  @Test
+  public void testExceptionInFormatErrorString() {
+    String sql =
+        "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
+            + "from testavro.PAGEVIEW as pv\n"
+            + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n"
+            + "group bys (pv.pageKey)";
+    String errorStr =
+        "org.apache.calcite.tools.ValidationException: 
org.apache.calcite.runtime.CalciteContextException: "
+            + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey' 
not found in any table";
+    String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new 
Exception(errorStr));
+    LOG.info(formattedErrStr);
+  }
+}
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index ac84fe8..d81cb3c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -32,7 +32,10 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
+import org.apache.samza.sql.planner.SamzaSqlValidatorException;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.sql.util.JsonUtil;
@@ -51,7 +54,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
 
   @Test
-  public void testEndToEnd() {
+  public void testEndToEnd() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -59,7 +62,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -70,7 +77,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndWithSystemMessages() {
+  public void testEndToEndWithSystemMessages() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -82,7 +89,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -92,7 +103,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndDisableSystemMessages() {
+  public void testEndToEndDisableSystemMessages() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -105,7 +116,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, 
"false");
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -115,7 +130,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndWithNullRecords() {
+  public void testEndToEndWithNullRecords() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -124,7 +139,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> x.getMessage() == null || ((GenericRecord) 
x.getMessage()).get("id") == null ? null
@@ -141,7 +160,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndWithDifferentSystemSameStream() {
+  public void testEndToEndWithDifferentSystemSameStream() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -149,7 +168,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testavro2.SIMPLE1 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -160,7 +183,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndMultiSqlStmts() {
+  public void testEndToEndMultiSqlStmts() throws SamzaSqlValidatorException {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -168,7 +191,12 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE2";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -180,7 +208,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
+  public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() 
throws SamzaSqlValidatorException {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -189,7 +217,12 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -201,7 +234,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndFanIn() {
+  public void testEndToEndFanIn() throws SamzaSqlValidatorException {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -209,7 +242,12 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql2 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -221,7 +259,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndFanOut() {
+  public void testEndToEndFanOut() throws SamzaSqlValidatorException {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -229,7 +267,12 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
         .sorted()
@@ -250,7 +293,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), 
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -270,14 +317,18 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select * from testavro.COMPLEX1 where bool_value IS TRUE";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
     Assert.assertEquals(numMessages / 2, outMessages.size());
   }
 
   @Test
-  public void testEndToEndCompoundBooleanCheck() {
+  public void testEndToEndCompoundBooleanCheck() throws 
SamzaSqlValidatorException {
 
     int numMessages = 20;
 
@@ -287,14 +338,18 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select * from testavro.COMPLEX1 where id >= 0 and bool_value IS 
TRUE";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
     Assert.assertEquals(numMessages / 2, outMessages.size());
   }
 
   @Test
-  public void testEndToEndCompoundBooleanCheckWorkaround() {
+  public void testEndToEndCompoundBooleanCheckWorkaround() throws 
SamzaSqlValidatorException {
 
     int numMessages = 20;
 
@@ -305,7 +360,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select * from testavro.COMPLEX1 where id >= 0 and CAST(bool_value 
AS VARCHAR) =  'TRUE'";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -322,7 +381,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) 
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL 
END as string_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -342,7 +405,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + " select id, name as string_value from testavro.SIMPLE1 where name 
like 'Name%'";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
@@ -365,7 +432,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -379,18 +450,22 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
 
   @Test
-  public void testEndToEndComplexRecord() {
+  public void testEndToEndComplexRecord() throws SamzaSqlValidatorException {
     int numMessages = 10;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     String sql1 =
         "Insert into testavro.outputTopic"
-            + " select map_values['key0'] as string_value, union_value, 
array_values[0] as string_value, map_values, id, bytes_value, fixed_value, 
float_value "
-            + " from testavro.COMPLEX1";
+            + " select map_values['key0'] as string_value, union_value, 
array_values, map_values, id, bytes_value,"
+            + " fixed_value, float_value from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -399,7 +474,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
   @Ignore
   @Test
-  public void testEndToEndNestedRecord() {
+  public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
     int numMessages = 10;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -410,7 +485,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
             + " from testavro.PROFILE as p";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -426,7 +505,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) 
as id from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -447,7 +530,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         "Insert into testavro.outputTopic(id) select Flatten(a) as id from 
(select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
 
@@ -460,7 +547,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testUdfUnTypedArgumentToTypedUdf() {
+  public void testUdfUnTypedArgumentToTypedUdf() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -468,7 +555,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + "select id, MyTest(MyTestObj(id)) as long_value from 
testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
 
@@ -488,7 +579,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + "select id, MYTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
 
@@ -532,7 +627,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
         + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from 
testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
 
@@ -559,7 +658,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
             + "where RegexMatch('.*4', name)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     LOG.info("output Messages " + TestAvroSystemFactory.messages);
     // There should be two messages that contain "4"
@@ -582,7 +685,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -610,7 +717,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -638,7 +749,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -666,7 +781,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> {
@@ -700,7 +819,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -733,7 +856,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -763,7 +890,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -793,7 +924,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -825,7 +960,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -855,7 +994,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -885,7 +1028,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -918,7 +1065,11 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     // Let's capture the list of windows/counts per key.
     HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c2219e8..7eafd48 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -25,7 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
+import org.apache.samza.sql.planner.SamzaSqlValidatorException;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.sql.util.JsonUtil;
@@ -38,7 +41,7 @@ import org.junit.Test;
 
 public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
   @Test
-  public void testSinkEndToEndWithKey() {
+  public void testSinkEndToEndWithKey() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     RemoteStoreIOResolverTestFactory.records.clear();
@@ -48,14 +51,18 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
   }
 
   @Test
   @Ignore("Disabled due to flakiness related to data generation; Refer Pull 
Request #905 for details")
-  public void testSinkEndToEndWithKeyWithNullRecords() {
+  public void testSinkEndToEndWithKeyWithNullRecords() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     RemoteStoreIOResolverTestFactory.records.clear();
@@ -68,13 +75,17 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
   }
 
   @Test (expected = AssertionError.class)
-  public void testSinkEndToEndWithoutKey() {
+  public void testSinkEndToEndWithoutKey() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     RemoteStoreIOResolverTestFactory.records.clear();
@@ -83,13 +94,17 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) 
select id, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
   }
 
   @Test
-  public void testSourceEndToEndWithKey() {
+  public void testSourceEndToEndWithKey() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -107,7 +122,11 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -120,7 +139,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testSourceEndToEndWithKeyAndUdf() {
+  public void testSourceEndToEndWithKeyAndUdf() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -138,7 +157,11 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -151,7 +174,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testSourceEndToEndWithKeyWithNullForeignKeys() {
+  public void testSourceEndToEndWithKeyWithNullForeignKeys() throws 
SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -170,7 +193,11 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -183,7 +210,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() {
+  public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() 
throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -202,7 +229,11 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
@@ -215,7 +246,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
+  public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws 
SamzaSqlValidatorException {
     int numMessages = 21;
 
     TestAvroSystemFactory.messages.clear();
@@ -237,7 +268,11 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
-    runApplication(new MapConfig(staticConfigs));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
 
     Assert.assertEquals((numMessages + 1) / 2, 
RemoteStoreIOResolverTestFactory.records.size());
   }

Reply via email to