This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f44b4f7  Fix AvroRelConverter to only consider cached schema while 
populating SamzaSqlRelRecord for all the nested records.
     new fb20518  Merge pull request #1233 from atoomula/schema_fix
f44b4f7 is described below

commit f44b4f7900f1e777f0845431db69003a221c7dd2
Author: Aditya Toomula <[email protected]>
AuthorDate: Mon Dec 9 09:50:29 2019 -0800

    Fix AvroRelConverter to only consider cached schema while populating 
SamzaSqlRelRecord for all the nested records.
---
 .../java/org/apache/samza/sql/avro/AvroRelConverter.java    | 13 +++----------
 .../java/org/apache/samza/sql/interfaces/SqlIOConfig.java   |  6 ++----
 .../java/org/apache/samza/sql/util/SamzaSqlQueryParser.java |  1 -
 3 files changed, 5 insertions(+), 15 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 0266384..d70497e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -125,18 +125,11 @@ public class AvroRelConverter implements 
SamzaRelConverter {
         .collect(Collectors.toList()));
   }
 
-  private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord 
avroRecord) {
+  private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord 
avroRecord, Schema schema) {
     List<Object> fieldValues = new ArrayList<>();
     List<String> fieldNames = new ArrayList<>();
     if (avroRecord != null) {
-      fieldNames.addAll(
-          
avroRecord.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
-      fieldValues.addAll(avroRecord.getSchema()
-          .getFields()
-          .stream()
-          .map(f -> 
convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
-              
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
-          .collect(Collectors.toList()));
+      fetchFieldNamesAndValuesFromIndexedRecord(avroRecord, fieldNames, 
fieldValues, schema);
     } else {
       String msg = "Avro Record is null";
       LOG.error(msg);
@@ -231,7 +224,7 @@ public class AvroRelConverter implements SamzaRelConverter {
     }
     switch (schema.getType()) {
       case RECORD:
-        return convertToRelRecord((IndexedRecord) avroObj);
+        return convertToRelRecord((IndexedRecord) avroObj, schema);
       case ARRAY: {
         ArrayList<Object> retVal = new ArrayList<>();
         List<Object> avroArray;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 69d301d..4350889 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,13 +81,11 @@ public class SqlIOConfig {
     this.streamId = String.format("%s-%s", systemName, streamName);
 
     samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName, String.format("System %s is 
unknown. %s is not set or empty for this"
-        + " system", systemName, CFG_SAMZA_REL_CONVERTER));
+    Validate.notEmpty(samzaRelConverterName, String.format("System %s is not 
supported. Please check if the system name is provided correctly.", 
systemName));
 
     if (isRemoteTable()) {
       samzaRelTableKeyConverterName = 
streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
-      Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System 
%s is unknown. %s is not set or empty for"
-          + " this system", systemName, CFG_SAMZA_REL_CONVERTER));
+      Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System 
%s is not supported. Please check if the system name is provided correctly.", 
systemName));
     } else {
       samzaRelTableKeyConverterName = "";
     }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java 
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index 630d3f3..70c0c8b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,7 +53,6 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.interfaces.SamzaSqlDriver;
 import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
-import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.planner.SamzaSqlValidator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Reply via email to