srinipunuru commented on a change in pull request #1149: SAMZA-2316: Validate 
that all non-default value fields in output schema are set in the projected 
fields.
URL: https://github.com/apache/samza/pull/1149#discussion_r321390035
 
 

 ##########
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
 ##########
 @@ -98,32 +101,63 @@ protected void validate(RelRoot relRoot, 
SamzaSqlQueryParser.QueryInfo qinfo, Sa
   protected void validateOutput(RelRoot relRoot, RelSchemaProvider 
relSchemaProvider) throws SamzaSqlValidatorException {
     RelRecordType outputRecord = (RelRecordType) 
QueryPlanner.getSourceRelSchema(relSchemaProvider,
         new RelSchemaConverter());
+    // Get Samza Sql schema along with Calcite schema. The reason is that the 
Calcite schema does not have a way
+    // to represent fields with default values while Samza Sql schema can 
represent default value fields. This is
+    // the only reason that we use SqlSchema in validating output.
+    SqlSchema outputSqlSchema = 
QueryPlanner.getSourceSqlSchema(relSchemaProvider);
+
     LogicalProject project = (LogicalProject) relRoot.rel;
     RelRecordType projetRecord = (RelRecordType) project.getRowType();
-    validateOutputRecords(outputRecord, projetRecord);
+
+    validateOutputRecords(outputRecord, outputSqlSchema, projetRecord);
   }
 
-  protected void validateOutputRecords(RelRecordType outputRecord, 
RelRecordType projectRecord)
+  protected void validateOutputRecords(RelRecordType outputRecord, SqlSchema 
outputSqlSchema,
+      RelRecordType projectRecord)
       throws SamzaSqlValidatorException {
     Map<String, RelDataType> outputRecordMap = 
outputRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, 
RelDataTypeField::getType));
+    Map<String, SqlFieldSchema> outputFieldSchemaMap = 
outputSqlSchema.getFields().stream().collect(
+        Collectors.toMap(SqlSchema.SqlField::getFieldName, 
SqlSchema.SqlField::getFieldSchema));
     Map<String, RelDataType> projectRecordMap = 
projectRecord.getFieldList().stream().collect(
         Collectors.toMap(RelDataTypeField::getName, 
RelDataTypeField::getType));
 
-    // There could be default values for the output schema and hence fields in 
project schema could be a subset of
-    // fields in output schema.
-    // TODO: SAMZA-2316: Validate that all non-default value fields in output 
schema are set in the projected fields.
+    // Ensure that all non-default value fields in output schema are set in 
the projected fields and are of the
+    // same type.
+    for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) {
+      RelDataType projectFieldType = projectRecordMap.get(entry.getKey());
+      SqlFieldSchema outputSqlFieldSchema = 
outputFieldSchemaMap.get(entry.getKey());
+
+      if (projectFieldType == null) {
+        if (entry.getKey().equals(SamzaSqlRelMessage.KEY_NAME) || 
outputSqlFieldSchema.hasDefaultValue()) {
 
 Review comment:
   Can you add comments here on why we are special casing KEY_NAME field?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to