srinipunuru commented on a change in pull request #1148: SAMZA-2313: Adding 
validation for Samza Sql statements.
URL: https://github.com/apache/samza/pull/1148#discussion_r320377388
 
 

 ##########
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
 ##########
 @@ -87,49 +82,38 @@ public QueryPlanner(Map<String, RelSchemaProvider> 
relSchemaProviders,
     this.udfMetadata = udfMetadata;
   }
 
+  private void fetchSourceSchema(SchemaPlus rootSchema) {
+    RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
+
+    for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
+      SchemaPlus previousLevelSchema = rootSchema;
+      List<String> sourceParts = ssc.getSourceParts();
+      RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
+
+      for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
+        String sourcePart = sourceParts.get(sourcePartIndex);
+        if (sourcePartIndex < sourceParts.size() - 1) {
+          SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
+          if (sourcePartSchema == null) {
+            sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
+          }
+          previousLevelSchema = sourcePartSchema;
+        } else {
+          // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
+          RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, 
relSchemaConverter);
+          previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
+          break;
+        }
+      }
+    }
+  }
+
   public RelRoot plan(String query) {
     try {
       Connection connection = DriverManager.getConnection("jdbc:calcite:");
       CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
-
-      for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
-        SchemaPlus previousLevelSchema = rootSchema;
-        List<String> sourceParts = ssc.getSourceParts();
-        RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());
-
-        for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); 
sourcePartIndex++) {
-          String sourcePart = sourceParts.get(sourcePartIndex);
-          if (sourcePartIndex < sourceParts.size() - 1) {
-            SchemaPlus sourcePartSchema = 
previousLevelSchema.getSubSchema(sourcePart);
-            if (sourcePartSchema == null) {
-              sourcePartSchema = previousLevelSchema.add(sourcePart, new 
AbstractSchema());
-            }
-            previousLevelSchema = sourcePartSchema;
-          } else {
-            // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
-            SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-
-            List<String> fieldNames = new ArrayList<>();
-            List<SqlFieldSchema> fieldTypes = new ArrayList<>();
-            if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
-              fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-              
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
-            }
-
-            fieldNames.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
-            fieldTypes.addAll(
-                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
-
-            SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
-            RelDataType relationalSchema = 
relSchemaConverter.convertToRelSchema(newSchema);
-            previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
-            break;
-          }
-        }
-      }
+      fetchSourceSchema(rootSchema);
 
 Review comment:
   nit: Can we call this as registerSourceSchemas? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to