atoomula commented on a change in pull request #1148: SAMZA-2313: Adding
validation for Samza Sql statements.
URL: https://github.com/apache/samza/pull/1148#discussion_r320485923
##########
File path:
samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -87,49 +82,38 @@ public QueryPlanner(Map<String, RelSchemaProvider>
relSchemaProviders,
this.udfMetadata = udfMetadata;
}
+ private void fetchSourceSchema(SchemaPlus rootSchema) {
+ RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
+
+ for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
+ SchemaPlus previousLevelSchema = rootSchema;
+ List<String> sourceParts = ssc.getSourceParts();
+ RelSchemaProvider relSchemaProvider =
relSchemaProviders.get(ssc.getSource());
+
+ for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size();
sourcePartIndex++) {
+ String sourcePart = sourceParts.get(sourcePartIndex);
+ if (sourcePartIndex < sourceParts.size() - 1) {
+ SchemaPlus sourcePartSchema =
previousLevelSchema.getSubSchema(sourcePart);
+ if (sourcePartSchema == null) {
+ sourcePartSchema = previousLevelSchema.add(sourcePart, new
AbstractSchema());
+ }
+ previousLevelSchema = sourcePartSchema;
+ } else {
+ // If the source part is the last one, then fetch the schema
corresponding to the stream and register.
+ RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider,
relSchemaConverter);
+ previousLevelSchema.add(sourcePart,
createTableFromRelSchema(relationalSchema));
+ break;
+ }
+ }
+ }
+ }
+
public RelRoot plan(String query) {
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
- RelSchemaConverter relSchemaConverter = new RelSchemaConverter();
-
- for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
- SchemaPlus previousLevelSchema = rootSchema;
- List<String> sourceParts = ssc.getSourceParts();
- RelSchemaProvider relSchemaProvider =
relSchemaProviders.get(ssc.getSource());
-
- for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size();
sourcePartIndex++) {
- String sourcePart = sourceParts.get(sourcePartIndex);
- if (sourcePartIndex < sourceParts.size() - 1) {
- SchemaPlus sourcePartSchema =
previousLevelSchema.getSubSchema(sourcePart);
- if (sourcePartSchema == null) {
- sourcePartSchema = previousLevelSchema.add(sourcePart, new
AbstractSchema());
- }
- previousLevelSchema = sourcePartSchema;
- } else {
- // If the source part is the last one, then fetch the schema
corresponding to the stream and register.
- SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-
- List<String> fieldNames = new ArrayList<>();
- List<SqlFieldSchema> fieldTypes = new ArrayList<>();
- if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
- fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
-
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
- }
-
- fieldNames.addAll(
-
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
- fieldTypes.addAll(
-
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
-
- SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
- RelDataType relationalSchema =
relSchemaConverter.convertToRelSchema(newSchema);
- previousLevelSchema.add(sourcePart,
createTableFromRelSchema(relationalSchema));
- break;
- }
- }
- }
+ fetchSourceSchema(rootSchema);
Review comment:
> Looks great overall. It would be good if you could add a UT to protect the
string parsing logic.
That's a good idea. Added that test now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services