This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 713a8bf SAMZA-2316: Validate that all non-default value fields in
output schema are set in the projected fields. (#1149)
713a8bf is described below
commit 713a8bf68c2f4f81f7c575f66b1cf091ad81113c
Author: Aditya Toomula <[email protected]>
AuthorDate: Thu Sep 5 21:47:46 2019 -0700
SAMZA-2316: Validate that all non-default value fields in output schema are
set in the projected fields. (#1149)
* Validate that all non-default value fields in output schema are set in
the projected fields.
* Added comments and renamed sqlFieldSchema member variables.
---
.../apache/samza/sql/schema/SqlFieldSchema.java | 57 ++++++++++++++++------
.../samza/sql/client/impl/SamzaExecutor.java | 6 +--
.../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 49 ++++++++++---------
.../org/apache/samza/sql/planner/QueryPlanner.java | 16 +++---
.../samza/sql/planner/RelSchemaConverter.java | 22 ++++-----
.../samza/sql/planner/SamzaSqlValidator.java | 55 ++++++++++++++++++---
.../samza/sql/avro/schemas/ComplexRecord.avsc | 13 +++--
.../samza/sql/avro/schemas/ComplexRecord.java | 8 ++-
.../samza/sql/planner/TestSamzaSqlValidator.java | 45 +++++++++++++++--
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 36 +++++++-------
10 files changed, 213 insertions(+), 94 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
index b944011..d3cec05 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlFieldSchema.java
@@ -24,44 +24,59 @@ package org.apache.samza.sql.schema;
*/
public class SqlFieldSchema {
- private SamzaSqlFieldType fieldType;
- private SqlFieldSchema elementType;
- private SqlFieldSchema valueType;
- private SqlSchema rowSchema;
+ private final SamzaSqlFieldType fieldType;
+ private final SqlFieldSchema elementType;
+ private final SqlFieldSchema valueType;
+ private final SqlSchema rowSchema;
+ // A field is considered nullable when the field could have a null value.
Please note that nullable field
+ // needs to be explicitly set while writing and is expected to be set while
reading. A non-nullable field
+ // cannot have a null value.
+ private final Boolean isNullable;
+ // A field is considered optional when the field has a default value. Such a
field need not be set while writing
+ // but is expected to be set while reading.
+ // Please note that nullable field is also optional field if a default value
is set but the value for
+ // nullable non-optional field need to be explicitly set.
+ private final Boolean isOptional;
- private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema
elementType, SqlFieldSchema valueType, SqlSchema rowSchema) {
+ private SqlFieldSchema(SamzaSqlFieldType fieldType, SqlFieldSchema
elementType, SqlFieldSchema valueType,
+ SqlSchema rowSchema, boolean isNullable, boolean isOptional) {
this.fieldType = fieldType;
this.elementType = elementType;
this.valueType = valueType;
this.rowSchema = rowSchema;
+ this.isNullable = isNullable;
+ this.isOptional = isOptional;
}
/**
- * Create a primitive fi
+ * Create a primitive field schema.
* @param typeName
* @return
*/
- public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType
typeName) {
- return new SqlFieldSchema(typeName, null, null, null);
+ public static SqlFieldSchema createPrimitiveSchema(SamzaSqlFieldType
typeName, boolean isNullable,
+ boolean isOptional) {
+ return new SqlFieldSchema(typeName, null, null, null, isNullable,
isOptional);
}
- public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType) {
- return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null,
null);
+ public static SqlFieldSchema createArraySchema(SqlFieldSchema elementType,
boolean isNullable,
+ boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.ARRAY, elementType, null,
null, isNullable, isOptional);
}
- public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType) {
- return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null);
+ public static SqlFieldSchema createMapSchema(SqlFieldSchema valueType,
boolean isNullable, boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.MAP, null, valueType, null,
isNullable, isOptional);
}
- public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema) {
- return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema);
+ public static SqlFieldSchema createRowFieldSchema(SqlSchema rowSchema,
boolean isNullable, boolean isOptional) {
+ return new SqlFieldSchema(SamzaSqlFieldType.ROW, null, null, rowSchema,
isNullable, isOptional);
}
/**
* @return whether the field is a primitive field type or not.
*/
public boolean isPrimitiveField() {
- return fieldType != SamzaSqlFieldType.ARRAY && fieldType !=
SamzaSqlFieldType.MAP && fieldType != SamzaSqlFieldType.ROW;
+ return fieldType != SamzaSqlFieldType.ARRAY && fieldType !=
SamzaSqlFieldType.MAP &&
+ fieldType != SamzaSqlFieldType.ROW;
}
/**
@@ -93,5 +108,17 @@ public class SqlFieldSchema {
return rowSchema;
}
+ /**
+ * Get if the field type is nullable.
+ */
+ public boolean isNullable() {
+ return isNullable;
+ }
+ /**
+ * Get if the field type is optional.
+ */
+ public boolean isOptional() {
+ return isOptional;
+ }
}
diff --git
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index db97516..5939adb 100755
---
a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++
b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -293,9 +293,9 @@ public class SamzaExecutor implements SqlExecutor {
*/
List<SqlFunction> udfs = new ArrayList<>();
udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to
the regex",
-
Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING),
-
SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING)),
- SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN)));
+
Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING,
false, false),
+ SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING,
false, false)),
+ SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, false,
false)));
return udfs;
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 68116b6..4c06938 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
@@ -33,8 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Factory that creates the Calcite relational types from the Avro Schema.
This is used by the
- * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ * Factory that creates {@link SqlSchema} from the Avro Schema. This is used
by the
+ * {@link AvroRelConverter} to convert Avro schema to Samza Sql schema.
*/
public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
@@ -60,7 +59,8 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
for (Schema.Field field : fields) {
- SqlFieldSchema fieldSchema = convertField(field.schema());
+ boolean isOptional = (field.defaultValue() != null);
+ SqlFieldSchema fieldSchema = convertField(field.schema(), false,
isOptional);
schemaBuilder.addField(field.name(), fieldSchema);
}
@@ -68,36 +68,41 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl
{
}
private SqlFieldSchema convertField(Schema fieldSchema) {
+ return convertField(fieldSchema, false, false);
+ }
+
+ private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable,
boolean isOptional) {
switch (fieldSchema.getType()) {
case ARRAY:
SqlFieldSchema elementSchema =
convertField(fieldSchema.getElementType());
- return SqlFieldSchema.createArraySchema(elementSchema);
+ return SqlFieldSchema.createArraySchema(elementSchema, isNullable,
isOptional);
case BOOLEAN:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN,
isNullable, isOptional);
case DOUBLE:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE,
isNullable, isOptional);
case FLOAT:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.FLOAT,
isNullable, isOptional);
case ENUM:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING,
isNullable, isOptional);
case UNION:
- return getSqlTypeFromUnionTypes(fieldSchema.getTypes());
+ return getSqlTypeFromUnionTypes(fieldSchema.getTypes(), isNullable,
isOptional);
case FIXED:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES,
isNullable, isOptional);
case STRING:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING,
isNullable, isOptional);
case BYTES:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES,
isNullable, isOptional);
case INT:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32,
isNullable, isOptional);
case LONG:
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64,
isNullable, isOptional);
case RECORD:
SqlSchema rowSchema = convertSchema(fieldSchema.getFields());
- return SqlFieldSchema.createRowFieldSchema(rowSchema);
+ return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable,
isOptional);
case MAP:
- SqlFieldSchema valueType = convertField(fieldSchema.getValueType());
- return SqlFieldSchema.createMapSchema(valueType);
+ // Can the value type be nullable and have default values ? Guess not!
+ SqlFieldSchema valueType = convertField(fieldSchema.getValueType(),
false, false);
+ return SqlFieldSchema.createMapSchema(valueType, isNullable,
isOptional);
default:
String msg = String.format("Field Type %s is not supported",
fieldSchema.getType());
LOG.error(msg);
@@ -105,17 +110,17 @@ public class AvroTypeFactoryImpl extends
SqlTypeFactoryImpl {
}
}
- private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types) {
+ private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean
isNullable, boolean isOptional) {
// Typically a nullable field's schema is configured as an union of Null
and a Type.
// This is to check whether the Union is a Nullable field
if (types.size() == 2) {
if (types.get(0).getType() == Schema.Type.NULL) {
- return convertField(types.get(1));
+ return convertField(types.get(1), true, isOptional);
} else if ((types.get(1).getType() == Schema.Type.NULL)) {
- return convertField(types.get(0));
+ return convertField(types.get(0), true, isOptional);
}
}
- return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY);
+ return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY,
isNullable, isOptional);
}
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index bdf03f7..83ccea0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -152,16 +152,15 @@ public class QueryPlanner {
}
}
- public static RelDataType getSourceRelSchema(RelSchemaProvider
relSchemaProvider,
- RelSchemaConverter relSchemaConverter) {
- // If the source part is the last one, then fetch the schema corresponding
to the stream and register.
+ public static SqlSchema getSourceSqlSchema(RelSchemaProvider
relSchemaProvider) {
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
+ // Key is a nullable and optional field. It is defaulted to null in
SamzaSqlRelMessage.
+
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY,
true, true));
}
fieldNames.addAll(
@@ -169,8 +168,13 @@ public class QueryPlanner {
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
- SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
- return relSchemaConverter.convertToRelSchema(newSchema);
+ return new SqlSchema(fieldNames, fieldTypes);
+ }
+
+ public static RelDataType getSourceRelSchema(RelSchemaProvider
relSchemaProvider,
+ RelSchemaConverter relSchemaConverter) {
+ // If the source part is the last one, then fetch the schema corresponding
to the stream and register.
+ return
relSchemaConverter.convertToRelSchema(getSourceSqlSchema(relSchemaProvider));
}
private static Table createTableFromRelSchema(RelDataType relationalSchema) {
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index 6634f5a..dbd74b6 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.MapSqlType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.samza.SamzaException;
@@ -73,30 +74,29 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
switch (fieldSchema.getFieldType()) {
case ARRAY:
RelDataType elementType =
getRelDataType(fieldSchema.getElementSchema());
- return new ArraySqlType(elementType, true);
+ return new ArraySqlType(elementType, fieldSchema.isNullable());
case BOOLEAN:
- return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN),
fieldSchema.isNullable());
case DOUBLE:
- return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE),
fieldSchema.isNullable());
case FLOAT:
- return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT),
fieldSchema.isNullable());
case STRING:
- return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR),
fieldSchema.isNullable());
case BYTES:
- return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY),
fieldSchema.isNullable());
case INT16:
case INT32:
- return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER),
fieldSchema.isNullable());
case INT64:
- return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT),
true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT),
fieldSchema.isNullable());
case ROW:
case ANY:
// TODO Calcite execution engine doesn't support record type yet.
- return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.ANY),
fieldSchema.isNullable());
case MAP:
RelDataType valueType = getRelDataType(fieldSchema.getValueScehma());
- return
super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR),
true),
- createTypeWithNullability(valueType, true));
+ return new MapSqlType(createSqlType(SqlTypeName.VARCHAR), valueType,
fieldSchema.isNullable());
default:
String msg = String.format("Field Type %s is not supported",
fieldSchema.getFieldType());
LOG.error(msg);
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 08d4497..9482a75 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -40,6 +40,8 @@ import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.schema.SqlFieldSchema;
+import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,24 +100,59 @@ public class SamzaSqlValidator {
protected void validateOutput(RelRoot relRoot, RelSchemaProvider
relSchemaProvider) throws SamzaSqlValidatorException {
RelRecordType outputRecord = (RelRecordType)
QueryPlanner.getSourceRelSchema(relSchemaProvider,
new RelSchemaConverter());
+ // Get Samza Sql schema along with Calcite schema. The reason is that the
Calcite schema does not have a way
+ // to represent optional fields while Samza Sql schema can represent
optional fields. This is the only reason that
+ // we use SqlSchema in validating output.
+ SqlSchema outputSqlSchema =
QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
LogicalProject project = (LogicalProject) relRoot.rel;
RelRecordType projetRecord = (RelRecordType) project.getRowType();
- validateOutputRecords(outputRecord, projetRecord);
+
+ validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
}
- protected void validateOutputRecords(RelRecordType outputRecord,
RelRecordType projectRecord)
+ protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema
outputSqlSchema,
+ RelRecordType projectRecord)
throws SamzaSqlValidatorException {
Map<String, RelDataType> outputRecordMap =
outputRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
+ Map<String, SqlFieldSchema> outputFieldSchemaMap =
outputSqlSchema.getFields().stream().collect(
+ Collectors.toMap(SqlSchema.SqlField::getFieldName,
SqlSchema.SqlField::getFieldSchema));
Map<String, RelDataType> projectRecordMap =
projectRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
- // There could be default values for the output schema and hence fields in
project schema could be a subset of
- // fields in output schema.
- // TODO: SAMZA-2316: Validate that all non-default value fields in output
schema are set in the projected fields.
+ // Ensure that all non-optional fields in output schema are set in the sql
query and are of the
+ // same type.
+ for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+ RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+ SqlFieldSchema outputSqlFieldSchema =
outputFieldSchemaMap.get(entry.getKey());
+
+ if (projectFieldType == null) {
+ // If an output schema field is not found in the sql query, ignore it
if the field is optional.
+ // Otherwise, throw an error.
+ if (outputSqlFieldSchema.isOptional()) {
+ continue;
+ }
+ String errMsg = String.format("Field '%s' in output schema does not
match any projected fields.",
+ entry.getKey());
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ } else if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema,
projectFieldType)) {
+ String errMsg = String.format("Field '%s' with type '%s' in output
schema does not match the field type '%s' in"
+ + " projected fields.", entry.getKey(), entry.getValue(),
projectFieldType);
+ LOG.error(errMsg);
+ throw new SamzaSqlValidatorException(errMsg);
+ }
+ }
+
+ // Ensure that all fields from sql statement exist in the output schema
and are of the same type.
for (Map.Entry<String, RelDataType> entry : projectRecordMap.entrySet()) {
RelDataType outputFieldType = outputRecordMap.get(entry.getKey());
+ SqlFieldSchema outputSqlFieldSchema =
outputFieldSchemaMap.get(entry.getKey());
+
if (outputFieldType == null) {
+ // If a field in sql query is not found in the output schema, ignore
if it is a Samza Sql special op.
+ // Otherwise, throw an error.
if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) {
continue;
}
@@ -123,7 +160,7 @@ public class SamzaSqlValidator {
entry.getKey());
LOG.error(errMsg);
throw new SamzaSqlValidatorException(errMsg);
- } else if (!compareFieldTypes(outputFieldType, entry.getValue())) {
+ } else if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema,
entry.getValue())) {
String errMsg = String.format("Field '%s' with type '%s' in select
query does not match the field type '%s' in"
+ " output schema.", entry.getKey(), entry.getValue(),
outputFieldType);
LOG.error(errMsg);
@@ -132,7 +169,8 @@ public class SamzaSqlValidator {
}
}
- protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType
selectQueryFieldType) {
+ protected boolean compareFieldTypes(RelDataType outputFieldType,
SqlFieldSchema sqlFieldSchema,
+ RelDataType selectQueryFieldType) {
RelDataType projectFieldType;
// JavaTypes are relevant for Udf argument and return types
@@ -168,7 +206,8 @@ public class SamzaSqlValidator {
return projectSqlType == SqlTypeName.FLOAT;
case ROW:
try {
- validateOutputRecords((RelRecordType) outputFieldType,
(RelRecordType) projectFieldType);
+ validateOutputRecords((RelRecordType) outputFieldType,
sqlFieldSchema.getRowSchema(),
+ (RelRecordType) projectFieldType);
} catch (SamzaSqlValidatorException e) {
LOG.error("A field in select query does not match with the output
schema.", e);
return false;
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
index 5e78bd9..c307b10 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
+++
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
@@ -26,14 +26,12 @@
{
"name": "id",
"doc": "Record id.",
- "type": ["null", "int"],
- "default":null
+ "type": "int"
},
{
"name": "bool_value",
"doc": "Boolean Value.",
- "type": ["null", "boolean"],
- "default":null
+ "type": ["null", "boolean"]
},
{
"name": "double_value",
@@ -72,8 +70,8 @@
"name": "MyFixed",
"type":"fixed",
"size":16
- }
- ]
+ }],
+ "default":null
},
{
"name": "array_values",
@@ -120,7 +118,8 @@
"doc" : "",
"fields" : [ ]
}
- ]
+ ],
+ "default":null
},
{
"name": "array_records",
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index 7796004..91a447f 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -16,11 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
package org.apache.samza.sql.avro.schemas;
@SuppressWarnings("all")
public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ =
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean
Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double
Value.\",\"default\":null},{\"na [...]
+ public static final org.apache.avro.Schema SCHEMA$ =
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Record
id.\"},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean
Value.\"},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double
Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"flo
[...]
/** Record id. */
public java.lang.Integer id;
/** Boolean Value. */
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
index b2ce6f6..4c9522e 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java
@@ -22,14 +22,11 @@ package org.apache.samza.sql.planner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.calcite.rel.RelRoot;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Before;
@@ -54,7 +51,7 @@ public class TestSamzaSqlValidator {
public void testBasicValidation() throws SamzaSqlValidatorException {
Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
- "Insert into testavro.outputTopic(id) select id, name as string_value"
+ "Insert into testavro.outputTopic select id, true as bool_value, name
as string_value"
+ " from testavro.level1.level2.SIMPLE1 as s where s.id = 1");
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
@@ -152,6 +149,46 @@ public class TestSamzaSqlValidator {
}
@Test
+ public void testNonDefaultButNullableField() {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ // bool_value is missing
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic(id) select Flatten(a) as id from
(select MyTestArray(id) a from testavro.SIMPLE1)");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ try {
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ } catch (SamzaSqlValidatorException e) {
+ Assert.assertTrue(e.getMessage().contains("Field 'bool_value' in output
schema does not match any projected fields."));
+ return;
+ }
+
+ Assert.fail("Validation test has failed.");
+ }
+
+ @Test
+ public void testNonDefaultOutputField() {
+ Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ // id is non-default field.
+ String sql = "Insert into testavro.outputTopic "
+ + " select NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) THEN
CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL END
as string_value from testavro.SIMPLE1";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+
+ try {
+ new SamzaSqlValidator(samzaConfig).validate(sqlStmts);
+ } catch (SamzaSqlValidatorException e) {
+ Assert.assertTrue(e.getMessage().contains("Field 'id' in output schema
does not match"));
+ return;
+ }
+
+ Assert.fail("Validation test has failed.");
+ }
+
+ @Test
public void testFormatErrorString() {
String sql =
"select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n"
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index d81cb3c..dec886e 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -289,8 +289,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(),
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value,
long_value) "
+ + " select id, NOT(id = 5) as bool_value, TIMESTAMPDIFF(HOUR,
CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value
from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -401,8 +401,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, name as string_value from testavro.SIMPLE1 where name
like 'Name%'";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value,
string_value) "
+ + " select id, NOT(id = 5) as bool_value, name as string_value from
testavro.SIMPLE1 where name like 'Name%'";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -427,8 +427,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
LOG.info(" Class Path : " +
RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
- "Insert into testavro.outputTopic(string_value, id, bytes_value,
fixed_value, float_value) "
- + " select Flatten(array_values) as string_value, id, bytes_value,
fixed_value, float_value "
+ "Insert into testavro.outputTopic(string_value, id, bool_value,
bytes_value, fixed_value, float_value) "
+ + " select Flatten(array_values) as string_value, id, NOT(id = 5)
as bool_value, bytes_value, fixed_value, float_value "
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -457,7 +457,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
String sql1 =
"Insert into testavro.outputTopic"
- + " select map_values['key0'] as string_value, union_value,
array_values, map_values, id, bytes_value,"
+ + " select bool_value, map_values['key0'] as string_value,
union_value, array_values, map_values, id, bytes_value,"
+ " fixed_value, float_value from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -502,7 +502,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id))
as id from testavro.SIMPLE1";
+ "Insert into testavro.outputTopic(id, bool_value) select
Flatten(MyTestArray(id)) as id, NOT(id = 5) as bool_value"
+ + " from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -527,7 +528,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) select Flatten(a) as id from
(select MyTestArray(id) a from testavro.SIMPLE1)";
+ "Insert into testavro.outputTopic(id, bool_value) select Flatten(a) as
id, true as bool_value"
+ + " from (select MyTestArray(id) a from testavro.SIMPLE1)";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -551,8 +553,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select id, MyTest(MyTestObj(id)) as long_value from
testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value,
long_value) "
+ + "select id, NOT(id = 5) as bool_value, MyTest(MyTestObj(id)) as
long_value from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -575,8 +577,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select id, MYTest(id) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value,
long_value) "
+ + "select id, NOT(id = 5) as bool_value, MYTest(id) as long_value from
testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -623,8 +625,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from
testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id, bool_value,
long_value) "
+ + "select MyTestPoly(id) as long_value, NOT(id = 5) as bool_value,
MyTestPoly(name) as id from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -652,8 +654,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic(id) "
- + "select id "
+ "Insert into testavro.outputTopic(id, bool_value) "
+ + "select id, NOT(id = 5) as bool_value "
+ "from testavro.SIMPLE1 "
+ "where RegexMatch('.*4', name)";
List<String> sqlStmts = Collections.singletonList(sql1);