srinipunuru commented on a change in pull request #1149: SAMZA-2316: Validate
that all non-default value fields in output schema are set in the projected
fields.
URL: https://github.com/apache/samza/pull/1149#discussion_r321390035
##########
File path:
samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
##########
@@ -98,32 +101,63 @@ protected void validate(RelRoot relRoot,
SamzaSqlQueryParser.QueryInfo qinfo, Sa
protected void validateOutput(RelRoot relRoot, RelSchemaProvider
relSchemaProvider) throws SamzaSqlValidatorException {
RelRecordType outputRecord = (RelRecordType)
QueryPlanner.getSourceRelSchema(relSchemaProvider,
new RelSchemaConverter());
+ // Get Samza Sql schema along with Calcite schema. The reason is that the
Calcite schema does not have a way
+ // to represent fields with default values while Samza Sql schema can
represent default value fields. This is
+ // the only reason that we use SqlSchema in validating output.
+ SqlSchema outputSqlSchema =
QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
LogicalProject project = (LogicalProject) relRoot.rel;
RelRecordType projetRecord = (RelRecordType) project.getRowType();
- validateOutputRecords(outputRecord, projetRecord);
+
+ validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
}
- protected void validateOutputRecords(RelRecordType outputRecord,
RelRecordType projectRecord)
+ protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema
outputSqlSchema,
+ RelRecordType projectRecord)
throws SamzaSqlValidatorException {
Map<String, RelDataType> outputRecordMap =
outputRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
+ Map<String, SqlFieldSchema> outputFieldSchemaMap =
outputSqlSchema.getFields().stream().collect(
+ Collectors.toMap(SqlSchema.SqlField::getFieldName,
SqlSchema.SqlField::getFieldSchema));
Map<String, RelDataType> projectRecordMap =
projectRecord.getFieldList().stream().collect(
Collectors.toMap(RelDataTypeField::getName,
RelDataTypeField::getType));
- // There could be default values for the output schema and hence fields in
project schema could be a subset of
- // fields in output schema.
- // TODO: SAMZA-2316: Validate that all non-default value fields in output
schema are set in the projected fields.
+ // Ensure that all non-default value fields in output schema are set in
the projected fields and are of the
+ // same type.
+ for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+ RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+ SqlFieldSchema outputSqlFieldSchema =
outputFieldSchemaMap.get(entry.getKey());
+
+ if (projectFieldType == null) {
+ if (entry.getKey().equals(SamzaSqlRelMessage.KEY_NAME) ||
outputSqlFieldSchema.hasDefaultValue()) {
Review comment:
Can you add comments here on why we are special casing KEY_NAME field?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services