atoomula commented on a change in pull request #1148: SAMZA-2313: Adding 
validation for Samza Sql statements.
URL: https://github.com/apache/samza/pull/1148#discussion_r320485923
 
 

 ##########
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
 ##########
 @@ -87,49 +82,38 @@ public QueryPlanner(Map<String, RelSchemaProvider> 
relSchemaProviders,
     this.udfMetadata = udfMetadata;
   }
 
+  private void fetchSourceSchema(SchemaPlus rootSchema) {
+    RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
+
+    for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
+      SchemaPlus previousLevelSchema = rootSchema;
+      List<String> sourceParts = ssc.getSourceParts();
+      RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
+
+      for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
+        String sourcePart = sourceParts.get(sourcePartIndex);
+        if (sourcePartIndex < sourceParts.size() - 1) {
+          SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
+          if (sourcePartSchema == null) {
+            sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
+          }
+          previousLevelSchema = sourcePartSchema;
+        } else {
+          // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
+          RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, 
relSchemaConverter);
+          previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
+          break;
+        }
+      }
+    }
+  }
+
   public RelRoot plan(String query) {
     try {
       Connection connection = DriverManager.getConnection("jdbc:calcite:");
       CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
-
-      for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
-        SchemaPlus previousLevelSchema = rootSchema;
-        List<String> sourceParts = ssc.getSourceParts();
-        RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
-
-        for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
-          String sourcePart = sourceParts.get(sourcePartIndex);
-          if (sourcePartIndex < sourceParts.size() - 1) {
-            SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
-            if (sourcePartSchema == null) {
-              sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
-            }
-            previousLevelSchema = sourcePartSchema;
-          } else {
-            // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
-            SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-
-            List<String> fieldNames = new ArrayList<>();
-            List<SqlFieldSchema> fieldTypes = new ArrayList<>();
-            if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
-              fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-              
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
-            }
-
-            fieldNames.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
-            fieldTypes.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
-
-            SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
-            RelDataType relationalSchema = 
relSchemaConverter.convertToRelSchema(newSchema);
-            previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
-            break;
-          }
-        }
-      }
+      fetchSourceSchema(rootSchema);
 
 Review comment:
   > Looks great overall. It would be good if you could add a UT to protect the 
string parsing logic.
   
   That's a good idea. Added that test now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to