This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 97afd3f SAMZA-2313: Adding validation for Samza Sql statements.
(#1148)
97afd3f is described below
commit 97afd3fa902c5fa9919fb3f930531eca84642269
Author: Aditya Toomula <[email protected]>
AuthorDate: Tue Sep 3 15:23:18 2019 -0700
SAMZA-2313: Adding validation for Samza Sql statements. (#1148)
* Adding validation for Samza Sql statements.
* Adding validation for Samza Sql statements.
* Adding validation for Samza Sql statements.
* Adding validation for Samza Sql statements.
* Adding validation for Samza Sql statements.
* Adding validation for Samza Sql statements.
---
.../apache/samza/sql/dsl/SamzaSqlDslConverter.java | 49 +++-
.../org/apache/samza/sql/planner/QueryPlanner.java | 98 ++++----
.../samza/sql/planner/SamzaSqlValidator.java | 279 +++++++++++++++++++++
.../sql/planner/SamzaSqlValidatorException.java | 40 +++
.../sql/runner/SamzaSqlApplicationConfig.java | 4 +
.../apache/samza/sql/util/SamzaSqlQueryParser.java | 6 +-
.../samza/sql/planner/TestSamzaSqlValidator.java | 181 +++++++++++++
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 255 +++++++++++++++----
.../test/samzasql/TestSamzaSqlRemoteTable.java | 67 +++--
9 files changed, 853 insertions(+), 126 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index ea0ebfa..b09d3d6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -52,31 +52,58 @@ public class SamzaSqlDslConverter implements DslConverter {
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql
statements
List<String> sqlStmts = fetchSqlFromConfig(config);
- List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
- SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
-
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toList()),
-
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
-
- QueryPlanner planner =
- new QueryPlanner(sqlConfig.getRelSchemaProviders(),
sqlConfig.getInputSystemStreamConfigBySource(),
- sqlConfig.getUdfMetadata());
-
+ QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
// we always pass only select query to the planner for samza sql. The
reason is that samza sql supports
// schema evolution where source and destination could up to an extent
have independent schema evolution while
// calcite expects strict comformance of the destination schema with
that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo =
SamzaSqlQueryParser.parseQuery(sql);
- relRoots.add(planner.plan(qinfo.getSelectQuery()));
+ RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
+ relRoots.add(relRoot);
}
return relRoots;
}
+ /**
+ * Get {@link SamzaSqlApplicationConfig} given sql statements and samza
config.
+ * @param sqlStmts List of sql statements
+ * @param config Samza config
+ * @return {@link SamzaSqlApplicationConfig}
+ */
+ public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts,
Config config) {
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ return new SamzaSqlApplicationConfig(config,
+
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig}
+ * @param sqlConfig {@link SamzaSqlApplicationConfig}
+ * @return {@link QueryPlanner}
+ */
+ public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig
sqlConfig) {
+ return new QueryPlanner(sqlConfig.getRelSchemaProviders(),
sqlConfig.getInputSystemStreamConfigBySource(),
+ sqlConfig.getUdfMetadata());
+ }
+
+ /**
+ * Get list of {@link
org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql
statements.
+ * @param sqlStmts list of sql statements
+ * @return list of {@link
org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo}
+ */
public static List<SamzaSqlQueryParser.QueryInfo>
fetchQueryInfo(List<String> sqlStmts) {
return
sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}
+ /**
+ * Get list of sql statements based on the property set in the config.
+ * @param config config
+ * @return list of Sql statements
+ */
public static List<String> fetchSqlFromConfig(Map<String, String> config) {
List<String> sql;
if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index bbf1770..bdf03f7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -36,9 +36,6 @@ import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
-import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
@@ -46,9 +43,7 @@ import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
@@ -87,49 +82,38 @@ public class QueryPlanner {
this.udfMetadata = udfMetadata;
}
+ private void registerSourceSchemas(SchemaPlus rootSchema) {
+ RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
+
+ for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
+ SchemaPlus previousLevelSchema = rootSchema;
+ List<String> sourceParts = ssc.getSourceParts();
+ RelSchemaProvider relSchemaProvider =
relSchemaProviders.get(ssc.getSource());
+
+ for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size();
sourcePartIndex++) {
+ String sourcePart = sourceParts.get(sourcePartIndex);
+ if (sourcePartIndex < sourceParts.size() - 1) {
+ SchemaPlus sourcePartSchema =
previousLevelSchema.getSubSchema(sourcePart);
+ if (sourcePartSchema == null) {
+ sourcePartSchema = previousLevelSchema.add(sourcePart, new
AbstractSchema());
+ }
+ previousLevelSchema = sourcePartSchema;
+ } else {
+ // If the source part is the last one, then fetch the schema
corresponding to the stream and register.
+ RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider,
relSchemaConverter);
+ previousLevelSchema.add(sourcePart,
createTableFromRelSchema(relationalSchema));
+ break;
+ }
+ }
+ }
+ }
+
public RelRoot plan(String query) {
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
- RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
-
- for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
- SchemaPlus previousLevelSchema = rootSchema;
- List<String> sourceParts = ssc.getSourceParts();
- RelSchemaProvider relSchemaProvider =
relSchemaProviders.get(ssc.getSource());
-
- for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size();
sourcePartIndex++) {
- String sourcePart = sourceParts.get(sourcePartIndex);
- if (sourcePartIndex < sourceParts.size() - 1) {
- SchemaPlus sourcePartSchema =
previousLevelSchema.getSubSchema(sourcePart);
- if (sourcePartSchema == null) {
- sourcePartSchema = previousLevelSchema.add(sourcePart, new
AbstractSchema());
- }
- previousLevelSchema = sourcePartSchema;
- } else {
- // If the source part is the last one, then fetch the schema
corresponding to the stream and register.
- SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-
- List<String> fieldNames = new ArrayList<>();
- List<SqlFieldSchema> fieldTypes = new ArrayList<>();
- if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
- fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
- }
-
- fieldNames.addAll(
-
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
- fieldTypes.addAll(
-
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
-
- SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
- RelDataType relationalSchema =
relSchemaConverter.convertToRelSchema(newSchema);
- previousLevelSchema.add(sourcePart,
createTableFromRelSchema(relationalSchema));
- break;
- }
- }
- }
+ registerSourceSchemas(rootSchema);
List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
.map(x -> new SamzaSqlScalarFunctionImpl(x))
@@ -162,12 +146,34 @@ public class QueryPlanner {
LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel));
return relRoot;
} catch (Exception e) {
- LOG.error("Query planner failed with exception.", e);
- throw new SamzaException(e);
+ String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
+ LOG.error(errorMsg, e);
+ throw new SamzaException(errorMsg, e);
+ }
+ }
+
+ public static RelDataType getSourceRelSchema(RelSchemaProvider
relSchemaProvider,
+ RelSchemaConverter relSchemaConverter) {
+ // If the source part is the last one, then fetch the schema corresponding
to the stream and register.
+ SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
+
+ List<String> fieldNames = new ArrayList<>();
+ List<SqlFieldSchema> fieldTypes = new ArrayList<>();
+ if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
+ fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
+
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}
+
+ fieldNames.addAll(
+
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
+ fieldTypes.addAll(
+
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
+
+ SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
+ return relSchemaConverter.convertToRelSchema(newSchema);
}
- private Table createTableFromRelSchema(RelDataType relationalSchema) {
+ private static Table createTableFromRelSchema(RelDataType relationalSchema) {
return new AbstractTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return relationalSchema;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
new file mode 100644
index 0000000..08d4497
--- /dev/null
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SamzaSqlValidator that uses calcite engine to convert the sql query to
relational graph and validates the query
+ * including the output.
+ */
+public class SamzaSqlValidator {
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaSqlValidator.class);
+
+ private final Config config;
+
+ public SamzaSqlValidator(Config config) {
+ this.config = config;
+ }
+
+ /**
+ * Validate a list of sql statements
+ * @param sqlStmts list of sql statements
+ * @throws SamzaSqlValidatorException
+ */
+ public void validate(List<String> sqlStmts) throws
SamzaSqlValidatorException {
+ SamzaSqlApplicationConfig sqlConfig =
SamzaSqlDslConverter.getSqlConfig(sqlStmts, config);
+ QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
+
+ for (String sql: sqlStmts) {
+ // we always pass only select query to the planner for samza sql. The
reason is that samza sql supports
+ // schema evolution where source and destination could up to an extent
have independent schema evolution while
+ // calcite expects strict conformance of the destination schema with
that of the fields in the select query.
+ SamzaSqlQueryParser.QueryInfo qinfo =
SamzaSqlQueryParser.parseQuery(sql);
+ RelRoot relRoot;
+ try {
+ relRoot = planner.plan(qinfo.getSelectQuery());
+ } catch (SamzaException e) {
+ throw new SamzaSqlValidatorException("Calcite planning for sql
failed.", e);
+ }
+
+ // Now that we have logical plan, validate different aspects.
+ validate(relRoot, qinfo, sqlConfig);
+ }
+ }
+
+ protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo
qinfo, SamzaSqlApplicationConfig sqlConfig)
+ throws SamzaSqlValidatorException {
+ // Validate select fields (including Udf return types) with output schema
+ validateOutput(relRoot,
sqlConfig.getRelSchemaProviders().get(qinfo.getSink()));
+
+ // TODO:
+ // 1. SAMZA-2314: Validate Udf arguments.
+ // 2. SAMZA-2315: Validate operators. These are the operators that are
supported by Calcite but not by Samza Sql.
+ // Eg: LogicalAggregate with sum function is not supported by Samza
Sql.
+ }
+
+ protected void validateOutput(RelRoot relRoot, RelSchemaProvider
relSchemaProvider) throws SamzaSqlValidatorException {
+ RelRecordType outputRecord = (RelRecordType)
QueryPlanner.getSourceRelSchema(relSchemaProvider,
+ new RelSchemaConverter());
+ LogicalProject project = (LogicalProject) relRoot.rel;
+ RelRecordType projetRecord = (RelRecordType) project.getRowType();
+ validateOutputRecords(outputRecord, projetRecord);
+ }
+
+ protected void validateOutputRecords(RelRecordType outputRecord,
RelRecordType projectRecord)
+ throws SamzaSqlValidatorException {
+ Map<String, RelDataType> outputRecordMap =
outputRecord.getFieldList().stream().collect(
+ Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
+ Map<String, RelDataType> projectRecordMap =
projectRecord.getFieldList().stream().collect(
+ Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
+
+ // There could be default values for the output schema and hence fields in
project schema could be a subset of
+ // fields in output schema.
+ // TODO: SAMZA-2316: Validate that all non-default value fields in output
schema are set in the projected fields.
+ for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
+ RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
+ if (outputFieldType == null) {
+ if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
+ continue;
+ }
+ String errMsg = String.format("Field '%s' in select query does not
match any field in output schema.",
+ entry.getKey());
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ } else if (!compareFieldTypes(outputFieldType, entry.getValue())) {
+ String errMsg = String.format("Field '%s' with type '%s' in select
query does not match the field type '%s' in"
+ + " output schema.", entry.getKey(), entry.getValue(),
outputFieldType);
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ }
+ }
+ }
+
+ protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType
selectQueryFieldType) {
+ RelDataType projectFieldType;
+
+ // JavaTypes are relevant for Udf argument and return types
+ // TODO: Support UDF argument validation. Currently, only return types are
validated and argument types are
+ // validated during run-time.
+ if (selectQueryFieldType instanceof RelDataTypeFactoryImpl.JavaType) {
+ projectFieldType = new
SamzaSqlJavaTypeFactoryImpl().toSql(selectQueryFieldType);
+ } else {
+ projectFieldType = selectQueryFieldType;
+ }
+
+ SqlTypeName outputSqlType = outputFieldType.getSqlTypeName();
+ SqlTypeName projectSqlType = projectFieldType.getSqlTypeName();
+
+ if (projectSqlType == SqlTypeName.ANY || outputSqlType == SqlTypeName.ANY)
{
+ return true;
+ } else if (outputSqlType != SqlTypeName.ROW && outputSqlType ==
projectSqlType) {
+ return true;
+ }
+
+ switch (outputSqlType) {
+ case CHAR:
+ return projectSqlType == SqlTypeName.VARCHAR;
+ case VARCHAR:
+ return projectSqlType == SqlTypeName.CHAR;
+ case BIGINT:
+ return projectSqlType == SqlTypeName.INTEGER;
+ case INTEGER:
+ return projectSqlType == SqlTypeName.BIGINT;
+ case FLOAT:
+ return projectSqlType == SqlTypeName.DOUBLE;
+ case DOUBLE:
+ return projectSqlType == SqlTypeName.FLOAT;
+ case ROW:
+ try {
+ validateOutputRecords((RelRecordType) outputFieldType,
(RelRecordType) projectFieldType);
+ } catch (SamzaSqlValidatorException e) {
+ LOG.error("A field in select query does not match with the output
schema.", e);
+ return false;
+ }
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ // -- All Static Methods below --
+
+ /**
+ * Format the Calcite exception to a more readable form.
+ *
+ * As an example, consider the below sql query which fails calcite
validation due to a non existing field :
+ * "Insert into testavro.outputTopic(id) select non_existing_name, name as
string_value"
+ * + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"
+ *
+ * This function takes in the above multi-line sql query and the below
sample exception as input:
+ * "org.apache.calcite.runtime.CalciteContextException: From line 1, column
8 to line 1, column 26: Column
+ * 'non_existing_name' not found in any table"
+ *
+ * And returns the following string:
+ * 2019-08-30 09:05:08 ERROR QueryPlanner:174 - Failed with exception for
the following sql statement:
+ *
+ * Sql syntax error:
+ *
+ * SELECT `non_existing_name`, `name` AS `string_value`
+ * -------^^^^^^^^^^^^^^^^^^^--------------------------
+ * FROM `testavro`.`level1`.`level2`.`SIMPLE1` AS `s`
+ * WHERE `s`.`id` = 1
+ *
+ * @param query sql query
+ * @param e Exception returned by Calcite
+ * @return formatted error string
+ */
+ public static String formatErrorString(String query, Exception e) {
+ Pattern pattern = Pattern.compile("line [0-9]+, column [0-9]+");
+ Matcher matcher = pattern.matcher(e.getMessage());
+ String[] queryLines = query.split("\\n");
+ StringBuilder result = new StringBuilder();
+ int startColIdx, endColIdx, startLineIdx, endLineIdx;
+
+ try {
+ if (matcher.find()) {
+ String match = matcher.group();
+ LOG.info(match);
+ startLineIdx = getIdxFromString(match, "line ");
+ startColIdx = getIdxFromString(match, "column ");
+ if (matcher.find()) {
+ match = matcher.group();
+ LOG.info(match);
+ endLineIdx = getIdxFromString(match, "line ");
+ endColIdx = getIdxFromString(match, "column ");
+ } else {
+ endColIdx = startColIdx;
+ endLineIdx = startLineIdx;
+ }
+ int lineLen = endLineIdx - startLineIdx;
+ int colLen = endColIdx - startColIdx + 1;
+
+ // Error spanning across multiple lines is not supported yet.
+ if (lineLen > 0) {
+ throw new SamzaException("lineLen formatting validation error: error
cannot span across multiple lines.");
+ }
+
+ int lineIdx = 0;
+ for (String line : queryLines) {
+ result.append(line)
+ .append("\n");
+ if (lineIdx == startLineIdx) {
+ String lineStr = getStringWithRepeatedChars('-', line.length() -
1);
+ String pointerStr = getStringWithRepeatedChars('^', colLen);
+ String errorMarkerStr =
+ new StringBuilder(lineStr).replace(startColIdx, endColIdx,
pointerStr).toString();
+ result.append(errorMarkerStr)
+ .append("\n");
+ }
+ lineIdx++;
+ }
+ }
+
+ String[] errorMsgParts = e.getMessage().split("Exception:");
+ result.append("\n")
+ .append(errorMsgParts[errorMsgParts.length - 1].trim());
+ return String.format("Sql syntax error:\n\n%s\n",
+ result);
+ } catch (Exception ex) {
+ // Ignore any formatting errors.
+ LOG.error("Formatting error (Not the actual error. Look for the logs for
actual error)", ex);
+ return String.format("Failed with formatting exception (not the actual
error) for the following sql"
+ + " statement:\n\"%s\"\n\n%s", query, e.getMessage());
+ }
+ }
+
+ private static int getIdxFromString(String inputString, String delimiterStr)
{
+ String[] splitStr = inputString.split(delimiterStr);
+ Scanner in = new Scanner(splitStr[1]).useDelimiter("[^0-9]+");
+ return in.nextInt() - 1;
+ }
+
+ private static String getStringWithRepeatedChars(char ch, int len) {
+ char[] chars = new char[len];
+ Arrays.fill(chars, ch);
+ return new String(chars);
+ }
+}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
new file mode 100644
index 0000000..812508f
--- /dev/null
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+/**
+ * Checked Exception thrown while validating SQL statement.
+ */
+public class SamzaSqlValidatorException extends Exception {
+ public SamzaSqlValidatorException() {
+ }
+
+ public SamzaSqlValidatorException(String message) {
+ super(message);
+ }
+
+ public SamzaSqlValidatorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SamzaSqlValidatorException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 07aa6bb..c494382 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -308,6 +308,10 @@ public class SamzaSqlApplicationConfig {
return outputSystemStreamConfigsBySource;
}
+ public SqlIOConfig getOutputSqlIOConfig(String source) {
+ return outputSystemStreamConfigsBySource.get(source);
+ }
+
public Map<String, SamzaRelConverter> getSamzaRelConverters() {
return samzaRelConvertersBySource;
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index d2ed991..630d3f3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,6 +53,8 @@ import org.apache.calcite.tools.Planner;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.interfaces.SamzaSqlDriver;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.apache.samza.sql.planner.QueryPlanner;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +104,9 @@ public class SamzaSqlQueryParser {
try {
sqlNode = planner.parse(sql);
} catch (SqlParseException e) {
- throw new SamzaException(e);
+ String errorMsg = SamzaSqlValidator.formatErrorString(sql, e);
+ LOG.error(errorMsg, e);
+ throw new SamzaException(errorMsg, e);
}
String sink;
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
new file mode 100644
index 0000000..b2ce6f6
--- /dev/null
+++
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -0,0 +1,181 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+
+
+public class TestSamzaSqlValidator {
+
+ private final Map<String, String> configs = new HashMap<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(TestSamzaSqlValidator.class);
+
+ @Before
+ public void setUp() {
+ configs.put("job.default.system", "kafka");
+ }
+
+ @Test
+ public void testBasicValidation() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select id, name as string_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaSqlValidatorException.class)
+ public void testNonExistingOutputField() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select id, name as strings_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testNonExistingSelectField() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select non_existing_field, name
as string_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+ SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ }
+
+ @Test(expected = SamzaSqlValidatorException.class)
+ public void testCalciteErrorString() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select non_existing_field, name
as string_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+
+ try {
+ SamzaSqlApplicationRunner.computeSamzaConfigs(true, new
MapConfig(config));
+ } catch (SamzaException e) {
+ Assert.assertTrue(e.getMessage().contains("line 1, column 8 to line 1,
column 27: Column 'non_existing_field' not found"));
+ throw new SamzaSqlValidatorException("Calcite planning for sql failed.",
e);
+ }
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testNonExistingUdf() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select NonExistingUdf(name) as
string_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaSqlValidatorException.class)
+ public void testSelectAndOutputValidationFailure() throws
SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select name as long_value"
+ + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testValidationStreamTableLeftJoinWithWhere() throws
SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)
select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv left join
testavro.PROFILE.`$table` as p where p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testUnsupportedOperator() throws SamzaSqlValidatorException {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)"
+ + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+ + " from testavro.PAGEVIEW as pv"
+ + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group bys (pv.pageKey)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ }
+
+ @Test
+ public void testFormatErrorString() {
+ String sql =
+ "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
+ + "from testavro.PAGEVIEW as pv\n"
+ + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n"
+ + "group bys (pv.pageKey)";
+ String errorStr =
+ "org.apache.calcite.tools.ValidationException:
org.apache.calcite.runtime.CalciteContextException: "
+ + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey'
not found in any table";
+ String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new
Exception(errorStr));
+ LOG.info(formattedErrStr);
+ }
+
+ @Test
+ public void testExceptionInFormatErrorString() {
+ String sql =
+ "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
+ + "from testavro.PAGEVIEW as pv\n"
+ + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n"
+ + "group bys (pv.pageKey)";
+ String errorStr =
+ "org.apache.calcite.tools.ValidationException:
org.apache.calcite.runtime.CalciteContextException: "
+ + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey'
not found in any table";
+ String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new
Exception(errorStr));
+ LOG.info(formattedErrStr);
+ }
+}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index ac84fe8..d81cb3c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -32,7 +32,10 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
+import org.apache.samza.sql.planner.SamzaSqlValidatorException;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
import org.apache.samza.sql.util.JsonUtil;
@@ -51,7 +54,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
private static final Logger LOG =
LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
@Test
- public void testEndToEnd() {
+ public void testEndToEnd() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -59,7 +62,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -70,7 +77,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndWithSystemMessages() {
+ public void testEndToEndWithSystemMessages() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -82,7 +89,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -92,7 +103,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndDisableSystemMessages() {
+ public void testEndToEndDisableSystemMessages() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -105,7 +116,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS,
"false");
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -115,7 +130,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndWithNullRecords() {
+ public void testEndToEndWithNullRecords() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -124,7 +139,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> x.getMessage() == null || ((GenericRecord)
x.getMessage()).get("id") == null ? null
@@ -141,7 +160,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndWithDifferentSystemSameStream() {
+ public void testEndToEndWithDifferentSystemSameStream() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -149,7 +168,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testavro2.SIMPLE1 select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -160,7 +183,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndMultiSqlStmts() {
+ public void testEndToEndMultiSqlStmts() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -168,7 +191,12 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql2 = "Insert into testavro.SIMPLE3 select * from
testavro.SIMPLE2";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
.sorted()
@@ -180,7 +208,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
+ public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput()
throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -189,7 +217,12 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql1, sql2);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
.sorted()
@@ -201,7 +234,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndFanIn() {
+ public void testEndToEndFanIn() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -209,7 +242,12 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql2 = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
.sorted()
@@ -221,7 +259,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndFanOut() {
+ public void testEndToEndFanOut() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -229,7 +267,12 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql2 = "Insert into testavro.SIMPLE3 select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
.sorted()
@@ -250,7 +293,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(),
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -270,14 +317,18 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select * from testavro.COMPLEX1 where bool_value IS TRUE";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
Assert.assertEquals(numMessages / 2, outMessages.size());
}
@Test
- public void testEndToEndCompoundBooleanCheck() {
+ public void testEndToEndCompoundBooleanCheck() throws
SamzaSqlValidatorException {
int numMessages = 20;
@@ -287,14 +338,18 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select * from testavro.COMPLEX1 where id >= 0 and bool_value IS
TRUE";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
Assert.assertEquals(numMessages / 2, outMessages.size());
}
@Test
- public void testEndToEndCompoundBooleanCheckWorkaround() {
+ public void testEndToEndCompoundBooleanCheckWorkaround() throws
SamzaSqlValidatorException {
int numMessages = 20;
@@ -305,7 +360,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select * from testavro.COMPLEX1 where id >= 0 and CAST(bool_value
AS VARCHAR) = 'TRUE'";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -322,7 +381,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7)
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL
END as string_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -342,7 +405,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " select id, name as string_value from testavro.SIMPLE1 where name
like 'Name%'";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
@@ -365,7 +432,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -379,18 +450,22 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
@Test
- public void testEndToEndComplexRecord() {
+ public void testEndToEndComplexRecord() throws SamzaSqlValidatorException {
int numMessages = 10;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic"
- + " select map_values['key0'] as string_value, union_value,
array_values[0] as string_value, map_values, id, bytes_value, fixed_value,
float_value "
- + " from testavro.COMPLEX1";
+ + " select map_values['key0'] as string_value, union_value,
array_values, map_values, id, bytes_value,"
+ + " fixed_value, float_value from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -399,7 +474,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
@Ignore
@Test
- public void testEndToEndNestedRecord() {
+ public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
int numMessages = 10;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -410,7 +485,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " from testavro.PROFILE as p";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -426,7 +505,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
"Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id))
as id from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -447,7 +530,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
"Insert into testavro.outputTopic(id) select Flatten(a) as id from
(select MyTestArray(id) a from testavro.SIMPLE1)";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
@@ -460,7 +547,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testUdfUnTypedArgumentToTypedUdf() {
+ public void testUdfUnTypedArgumentToTypedUdf() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -468,7 +555,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ "select id, MyTest(MyTestObj(id)) as long_value from
testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
LOG.info("output Messages " + TestAvroSystemFactory.messages);
@@ -488,7 +579,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ "select id, MYTest(id) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
LOG.info("output Messages " + TestAvroSystemFactory.messages);
@@ -532,7 +627,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from
testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
LOG.info("output Messages " + TestAvroSystemFactory.messages);
@@ -559,7 +658,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ "where RegexMatch('.*4', name)";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
LOG.info("output Messages " + TestAvroSystemFactory.messages);
// There should be two messages that contain "4"
@@ -582,7 +685,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -610,7 +717,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -638,7 +749,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -666,7 +781,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> {
@@ -700,7 +819,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -733,7 +856,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -763,7 +890,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -793,7 +924,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -825,7 +960,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -855,7 +994,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -885,7 +1028,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -918,7 +1065,11 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
// Let's capture the list of windows/counts per key.
HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c2219e8..7eafd48 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -25,7 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.planner.SamzaSqlValidator;
+import org.apache.samza.sql.planner.SamzaSqlValidatorException;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
import org.apache.samza.sql.util.JsonUtil;
@@ -38,7 +41,7 @@ import org.junit.Test;
public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
@Test
- public void testSinkEndToEndWithKey() {
+ public void testSinkEndToEndWithKey() throws SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
@@ -48,14 +51,18 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testRemoteStore.testTable.`$table` select
__key__, id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
Assert.assertEquals(numMessages,
RemoteStoreIOResolverTestFactory.records.size());
}
@Test
@Ignore("Disabled due to flakiness related to data generation; Refer Pull
Request #905 for details")
- public void testSinkEndToEndWithKeyWithNullRecords() {
+ public void testSinkEndToEndWithKeyWithNullRecords() throws
SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
@@ -68,13 +75,17 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
Assert.assertEquals(numMessages,
RemoteStoreIOResolverTestFactory.records.size());
}
@Test (expected = AssertionError.class)
- public void testSinkEndToEndWithoutKey() {
+ public void testSinkEndToEndWithoutKey() throws SamzaSqlValidatorException {
int numMessages = 20;
RemoteStoreIOResolverTestFactory.records.clear();
@@ -83,13 +94,17 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
String sql = "Insert into testRemoteStore.testTable.`$table`(id,name)
select id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
Assert.assertEquals(numMessages,
RemoteStoreIOResolverTestFactory.records.size());
}
@Test
- public void testSourceEndToEndWithKey() {
+ public void testSourceEndToEndWithKey() throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -107,7 +122,11 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -120,7 +139,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testSourceEndToEndWithKeyAndUdf() {
+ public void testSourceEndToEndWithKeyAndUdf() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -138,7 +157,11 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -151,7 +174,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testSourceEndToEndWithKeyWithNullForeignKeys() {
+ public void testSourceEndToEndWithKeyWithNullForeignKeys() throws
SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -170,7 +193,11 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -183,7 +210,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() {
+ public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin()
throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -202,7 +229,11 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
@@ -215,7 +246,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
+ public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws
SamzaSqlValidatorException {
int numMessages = 21;
TestAvroSystemFactory.messages.clear();
@@ -237,7 +268,11 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
- runApplication(new MapConfig(staticConfigs));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
Assert.assertEquals((numMessages + 1) / 2,
RemoteStoreIOResolverTestFactory.records.size());
}