Repository: samza
Updated Branches:
  refs/heads/master 025f61710 -> 0c0904b01


SAMZA-1779: Fix SamzaSql AvroRelConverter to handle BYTES and FIXED avro schema 
types

Author: Aditya Toomula <[email protected]>

Reviewers: Srinivasulu Punuru <[email protected]>

Closes #575 from atoomula/bytes1 and squashes the following commits:

855a03d7 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and 
FIXED avro schema types
df4886d8 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and 
FIXED avro schema types
80268fc1 [Aditya Toomula] Fix SamzaSql AvroRelConverter to handle BYTES and 
FIXED avro schema types


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0c0904b0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0c0904b0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0c0904b0

Branch: refs/heads/master
Commit: 0c0904b01f1467b29d755401aa05cc48d1098270
Parents: 025f617
Author: Aditya Toomula <[email protected]>
Authored: Tue Jul 24 14:25:19 2018 -0700
Committer: Srinivasulu Punuru <[email protected]>
Committed: Tue Jul 24 14:25:19 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java    | 14 +++++++++++---
 .../samza/sql/avro/TestAvroRelConversion.java      | 17 ++++++++++++++++-
 .../samza/sql/avro/schemas/ComplexRecord.java      |  2 +-
 .../org/apache/samza/sql/avro/schemas/MyFixed.java |  5 ++++-
 .../samza/sql/system/TestAvroSystemFactory.java    |  9 +++++++++
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |  5 ++++-
 6 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index ed22cc5..7d97466 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.avro;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.calcite.avatica.util.ByteString;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -172,10 +174,12 @@ public class AvroRelConverter implements 
SamzaRelConverter {
                 getNonNullUnionSchema(schema).getValueType())));
       case UNION:
         return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
-      case FIXED:
-        return new GenericData.Fixed(schema, ((String) relObj).getBytes());
       case ENUM:
         return new GenericData.EnumSymbol(schema, (String) relObj);
+      case FIXED:
+        return new GenericData.Fixed(schema, ((ByteString) relObj).getBytes());
+      case BYTES:
+        return ByteBuffer.wrap(((ByteString) relObj).getBytes());
       default:
         return relObj;
     }
@@ -218,8 +222,12 @@ public class AvroRelConverter implements SamzaRelConverter 
{
       case UNION:
         return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
       case ENUM:
-      case FIXED:
         return avroObj.toString();
+      case FIXED:
+        GenericData.Fixed fixed = (GenericData.Fixed) avroObj;
+        return new ByteString(fixed.bytes());
+      case BYTES:
+        return new ByteString(((ByteBuffer) avroObj).array());
 
       default:
         return avroObj;

http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 61abdfc..708eb3e 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -43,6 +43,7 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,6 +52,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
 import org.apache.samza.sql.avro.schemas.Kind;
+import org.apache.samza.sql.avro.schemas.MyFixed;
 import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
@@ -66,6 +68,9 @@ import org.slf4j.LoggerFactory;
 public class TestAvroRelConversion {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestAvroRelConversion.class);
+  private static final byte[] DEFAULT_TRACKING_ID_BYTES =
+      {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, 
-1};
+
   private final AvroRelConverter simpleRecordAvroRelConverter;
   private final AvroRelConverter complexRecordAvroRelConverter;
   private final AvroRelConverter nestedRecordAvroRelConverter;
@@ -79,6 +84,7 @@ public class TestAvroRelConversion {
   private float floatValue = 0.6f;
   private String testStrValue = "testString";
   private ByteBuffer testBytes = ByteBuffer.wrap("testBytes".getBytes());
+  private MyFixed fixedBytes = new MyFixed();
   private long longValue = 200L;
 
   private HashMap<String, String> mapValue = new HashMap<String, String>() {{
@@ -111,6 +117,8 @@ public class TestAvroRelConversion {
     complexRecordAvroRelConverter = new AvroRelConverter(ss1, 
complexRecordSchemaProvider, new MapConfig());
     simpleRecordAvroRelConverter = new AvroRelConverter(ss2, 
simpleRecordSchemaProvider, new MapConfig());
     nestedRecordAvroRelConverter = new AvroRelConverter(ss3, 
nestedRecordSchemaProvider, new MapConfig());
+
+    fixedBytes.bytes(DEFAULT_TRACKING_ID_BYTES);
   }
 
   @Test
@@ -190,6 +198,7 @@ public class TestAvroRelConversion {
     record.put("float_value", floatValue);
     record.put("string_value", testStrValue);
     record.put("bytes_value", testBytes);
+    record.put("fixed_value", fixedBytes);
     record.put("long_value", longValue);
     record.put("array_values", arrayValue);
     record.put("map_values", mapValue);
@@ -201,6 +210,7 @@ public class TestAvroRelConversion {
     complexRecord.float_value = floatValue;
     complexRecord.string_value = testStrValue;
     complexRecord.bytes_value = testBytes;
+    complexRecord.fixed_value = fixedBytes;
     complexRecord.long_value = longValue;
     complexRecord.array_values = new ArrayList<>();
     complexRecord.array_values.addAll(arrayValue);
@@ -304,7 +314,12 @@ public class TestAvroRelConversion {
         .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new 
Utf8(y.getValue())))
         .equals(message.getSamzaSqlRelRecord().getField("map_values").get()));
 
-    
Assert.assertTrue(message.getSamzaSqlRelRecord().getField("bytes_value").get().equals(testBytes));
+    Assert.assertTrue(
+        Arrays.equals(((ByteString) 
message.getSamzaSqlRelRecord().getField("bytes_value").get()).getBytes(),
+            testBytes.array()));
+    Assert.assertTrue(
+        Arrays.equals(((ByteString) 
message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(),
+            DEFAULT_TRACKING_ID_BYTES));
 
     
LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues()));
     
LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));

http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
index dfba35e..f46918a 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
@@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean
 
Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double
 
Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float
 
Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string
 
Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes
 
Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long
 
Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed
  
Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array
 values in the 
record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map
 values in the 
record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum
 
value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub
 record 
id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub
 record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean
 
Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double
 
Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float
 
Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string
 
Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes
 
Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long
 
Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixe
 d 
Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array
 values in the 
record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map
 values in the 
record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum
 
value.\",\"default\":[]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub
 record 
id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub
 record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */

http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
index 1405c84..8aaaee8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/MyFixed.java
@@ -26,4 +26,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 @org.apache.avro.specific.FixedSize(16)
-public class MyFixed extends org.apache.avro.specific.SpecificFixed {}
+public class MyFixed extends org.apache.avro.specific.SpecificFixed {
+  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"fixed\",\"name\":\"MyFixed\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"size\":16}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 9a45034..676781c 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.system;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,12 +31,14 @@ import java.util.stream.IntStream;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.calcite.avatica.util.ByteString;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
 import org.apache.samza.sql.avro.schemas.Kind;
+import org.apache.samza.sql.avro.schemas.MyFixed;
 import org.apache.samza.sql.avro.schemas.PageView;
 import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
@@ -67,6 +70,8 @@ public class TestAvroSystemFactory implements SystemFactory {
       "444-444-4444", "555-555-5555"};
   public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", 
"AMZN", "CSCO"};
   public static final String[] pageKeys = {"inbox", "home", "search", "pymk", 
"group", "job"};
+  public static final byte[] DEFAULT_TRACKING_ID_BYTES =
+      {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, 
-1};
 
   public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
 
@@ -291,6 +296,10 @@ public class TestAvroSystemFactory implements 
SystemFactory {
       GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
       record.put("id", index);
       record.put("string_value", "Name" + index);
+      record.put("bytes_value", ByteBuffer.wrap(("sample bytes").getBytes()));
+      MyFixed myFixedVar = new MyFixed();
+      myFixedVar.bytes(DEFAULT_TRACKING_ID_BYTES);
+      record.put("fixed_value", myFixedVar);
       GenericData.Array<String> arrayValues =
           new GenericData.Array<>(index, 
ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
       arrayValues.addAll(IntStream.range(0, 
index).mapToObj(String::valueOf).collect(Collectors.toList()));

http://git-wip-us.apache.org/repos/asf/samza/blob/0c0904b0/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index d511a39..4ee42ae 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -94,9 +94,12 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+
     LOG.info(" Class Path : " + 
RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic select Flatten(array_values) as 
string_value, id from testavro.COMPLEX1";
+        "Insert into testavro.outputTopic "
+            + "select Flatten(array_values) as string_value, id, bytes_value, 
fixed_value "
+            + "from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));

Reply via email to