This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f44b4f7 Fix AvroRelConverter to only consider cached schema while
populating SamzaSqlRelRecord for all the nested records.
new fb20518 Merge pull request #1233 from atoomula/schema_fix
f44b4f7 is described below
commit f44b4f7900f1e777f0845431db69003a221c7dd2
Author: Aditya Toomula <[email protected]>
AuthorDate: Mon Dec 9 09:50:29 2019 -0800
Fix AvroRelConverter to only consider cached schema while populating
SamzaSqlRelRecord for all the nested records.
---
.../java/org/apache/samza/sql/avro/AvroRelConverter.java | 13 +++----------
.../java/org/apache/samza/sql/interfaces/SqlIOConfig.java | 6 ++----
.../java/org/apache/samza/sql/util/SamzaSqlQueryParser.java | 1 -
3 files changed, 5 insertions(+), 15 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 0266384..d70497e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -125,18 +125,11 @@ public class AvroRelConverter implements
SamzaRelConverter {
.collect(Collectors.toList()));
}
- private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord
avroRecord) {
+ private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord
avroRecord, Schema schema) {
List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (avroRecord != null) {
- fieldNames.addAll(
-
avroRecord.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
- fieldValues.addAll(avroRecord.getSchema()
- .getFields()
- .stream()
- .map(f ->
convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
-
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
- .collect(Collectors.toList()));
+ fetchFieldNamesAndValuesFromIndexedRecord(avroRecord, fieldNames,
fieldValues, schema);
} else {
String msg = "Avro Record is null";
LOG.error(msg);
@@ -231,7 +224,7 @@ public class AvroRelConverter implements SamzaRelConverter {
}
switch (schema.getType()) {
case RECORD:
- return convertToRelRecord((IndexedRecord) avroObj);
+ return convertToRelRecord((IndexedRecord) avroObj, schema);
case ARRAY: {
ArrayList<Object> retVal = new ArrayList<>();
List<Object> avroArray;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 69d301d..4350889 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,13 +81,11 @@ public class SqlIOConfig {
this.streamId = String.format("%s-%s", systemName, streamName);
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
- Validate.notEmpty(samzaRelConverterName, String.format("System %s is
unknown. %s is not set or empty for this"
- + " system", systemName, CFG_SAMZA_REL_CONVERTER));
+ Validate.notEmpty(samzaRelConverterName, String.format("System %s is not
supported. Please check if the system name is provided correctly.",
systemName));
if (isRemoteTable()) {
samzaRelTableKeyConverterName =
streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
- Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System
%s is unknown. %s is not set or empty for"
- + " this system", systemName, CFG_SAMZA_REL_CONVERTER));
+ Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System
%s is not supported. Please check if the system name is provided correctly.",
systemName));
} else {
samzaRelTableKeyConverterName = "";
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index 630d3f3..70c0c8b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,7 +53,6 @@ import org.apache.calcite.tools.Planner;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.interfaces.SamzaSqlDriver;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
-import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.planner.SamzaSqlValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;