Repository: samza Updated Branches: refs/heads/master 025f61710 -> 0c0904b01
SAMZA-1779: Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #575 from atoomula/bytes1 and squashes the following commits: 855a03d7 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types df4886d8 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types 80268fc1 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema types Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0c0904b0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0c0904b0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0c0904b0 Branch: refs/heads/master Commit: 0c0904b01f1467b29d755401aa05cc48d1098270 Parents: 025f617 Author: Aditya Toomula <[email protected]> Authored: Tue Jul 24 14:25:19 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Tue Jul 24 14:25:19 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/sql/avro/AvroRelConverter.java | 14 +++++++++++--- .../samza/sql/avro/TestAvroRelConversion.java | 17 ++++++++++++++++- .../samza/sql/avro/schemas/ComplexRecord.java | 2 +- .../org/apache/samza/sql/avro/schemas/MyFixed.java | 5 ++++- .../samza/sql/system/TestAvroSystemFactory.java | 9 +++++++++ .../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 5 ++++- 6 files changed, 45 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index ed22cc5..7d97466 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.avro; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.calcite.avatica.util.ByteString; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -172,10 +174,12 @@ public class AvroRelConverter implements SamzaRelConverter { getNonNullUnionSchema(schema).getValueType()))); case UNION: return convertToAvroObject(relObj, getNonNullUnionSchema(schema)); - case FIXED: - return new GenericData.Fixed(schema, ((String) relObj).getBytes()); case ENUM: return new GenericData.EnumSymbol(schema, (String) relObj); + case FIXED: + return new GenericData.Fixed(schema, ((ByteString) relObj).getBytes()); + case BYTES: + return ByteBuffer.wrap(((ByteString) relObj).getBytes()); default: return relObj; } @@ -218,8 +222,12 @@ public class AvroRelConverter implements SamzaRelConverter { case UNION: return convertToJavaObject(avroObj, getNonNullUnionSchema(schema)); case ENUM: - case FIXED: return avroObj.toString(); + case FIXED: + GenericData.Fixed fixed = (GenericData.Fixed) avroObj; + return new ByteString(fixed.bytes()); + case BYTES: + return new ByteString(((ByteBuffer) avroObj).array()); default: return avroObj; http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java index 61abdfc..708eb3e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java @@ -43,6 +43,7 @@ import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; +import org.apache.calcite.avatica.util.ByteString; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.sql.type.SqlTypeName; @@ -51,6 +52,7 @@ import org.apache.samza.operators.KV; import org.apache.samza.sql.avro.schemas.AddressRecord; import org.apache.samza.sql.avro.schemas.ComplexRecord; import org.apache.samza.sql.avro.schemas.Kind; +import org.apache.samza.sql.avro.schemas.MyFixed; import org.apache.samza.sql.avro.schemas.PhoneNumber; import org.apache.samza.sql.avro.schemas.Profile; import org.apache.samza.sql.avro.schemas.SimpleRecord; @@ -66,6 +68,9 @@ import org.slf4j.LoggerFactory; public class TestAvroRelConversion { private static final Logger LOG = LoggerFactory.getLogger(TestAvroRelConversion.class); + private static final byte[] DEFAULT_TRACKING_ID_BYTES = + {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1}; + private final AvroRelConverter simpleRecordAvroRelConverter; private final AvroRelConverter complexRecordAvroRelConverter; private final AvroRelConverter nestedRecordAvroRelConverter; @@ -79,6 +84,7 @@ public class TestAvroRelConversion { private float floatValue = 0.6f; private String testStrValue = "testString"; private ByteBuffer testBytes = ByteBuffer.wrap("testBytes".getBytes()); + private MyFixed fixedBytes = new MyFixed(); private long longValue = 200L; private HashMap<String, String> mapValue = new HashMap<String, String>() {{ @@ -111,6 +117,8 @@ public class TestAvroRelConversion { complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemaProvider, new MapConfig()); simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig()); nestedRecordAvroRelConverter = new AvroRelConverter(ss3, nestedRecordSchemaProvider, new MapConfig()); + + fixedBytes.bytes(DEFAULT_TRACKING_ID_BYTES); } @Test @@ -190,6 +198,7 @@ public class TestAvroRelConversion { record.put("float_value", floatValue); record.put("string_value", testStrValue); record.put("bytes_value", testBytes); + record.put("fixed_value", fixedBytes); record.put("long_value", longValue); record.put("array_values", arrayValue); record.put("map_values", mapValue); @@ -201,6 +210,7 @@ public class TestAvroRelConversion { complexRecord.float_value = floatValue; complexRecord.string_value = testStrValue; complexRecord.bytes_value = testBytes; + complexRecord.fixed_value = fixedBytes; complexRecord.long_value = longValue; complexRecord.array_values = new ArrayList<>(); complexRecord.array_values.addAll(arrayValue); @@ -304,7 +314,12 @@ public class TestAvroRelConversion { .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue()))) .equals(message.getSamzaSqlRelRecord().getField("map_values").get())); - Assert.assertTrue(message.getSamzaSqlRelRecord().getField("bytes_value").get().equals(testBytes)); + Assert.assertTrue( + Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("bytes_value").get()).getBytes(), + testBytes.array())); + Assert.assertTrue( + Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(), + DEFAULT_TRACKING_ID_BYTES)); LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues())); LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames())); http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java index dfba35e..f46918a 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java @@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas; @SuppressWarnings("all") public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}"); + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixe d Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}"); /** Record id. */ public java.lang.Integer id; /** Boolean Value. */ http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java index 1405c84..8aaaee8 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java @@ -26,4 +26,7 @@ package org.apache.samza.sql.avro.schemas; @SuppressWarnings("all") @org.apache.avro.specific.FixedSize(16) -public class MyFixed extends org.apache.avro.specific.SpecificFixed {} +public class MyFixed extends org.apache.avro.specific.SpecificFixed { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"fixed\",\"name\":\"MyFixed\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"size\":16}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } +} http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 9a45034..676781c 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.system; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -30,12 +31,14 @@ import java.util.stream.IntStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.calcite.avatica.util.ByteString; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.sql.avro.schemas.AddressRecord; import org.apache.samza.sql.avro.schemas.Company; import org.apache.samza.sql.avro.schemas.ComplexRecord; import org.apache.samza.sql.avro.schemas.Kind; +import org.apache.samza.sql.avro.schemas.MyFixed; import org.apache.samza.sql.avro.schemas.PageView; import org.apache.samza.sql.avro.schemas.PhoneNumber; import org.apache.samza.sql.avro.schemas.Profile; @@ -67,6 +70,8 @@ public class TestAvroSystemFactory implements SystemFactory { "444-444-4444", "555-555-5555"}; public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"}; public static final String[] pageKeys = {"inbox", "home", "search", "pymk", "group", "job"}; + public static final byte[] DEFAULT_TRACKING_ID_BYTES = + {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1}; public static List<OutgoingMessageEnvelope> messages = new ArrayList<>(); @@ -291,6 +296,10 @@ public class TestAvroSystemFactory implements SystemFactory { GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$); record.put("id", index); record.put("string_value", "Name" + index); + record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes())); + MyFixed myFixedVar = new MyFixed(); + myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES); + record.put("fixed_value", myFixedVar); GenericData.Array<String> arrayValues = new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1)); arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList())); http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index d511a39..4ee42ae 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -94,9 +94,12 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); String sql1 = - "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1"; + "Insert into testavro.outputTopic " + + "select Flatten(array_values) as string_value, id, bytes_value, fixed_value " + + "from testavro.COMPLEX1"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
