Repository: samza
Updated Branches:
  refs/heads/master 603af35a5 -> 445d1e26c


SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes 
for Avro and Rel conversion.

Adding Serde for rel record, as calcite expects the keys to be in string 
format. Rel converters are always expected to provide keys as strings. If key 
is an avro record, it is expected that the rel converter changes the avro 
record to rel record and serializes it and deserializes it when conerting rel 
message to samza message.

Author: Aditya Toomula <[email protected]>

Reviewers: Srini P<[email protected]>

Closes #495 from atoomula/rel1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/445d1e26
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/445d1e26
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/445d1e26

Branch: refs/heads/master
Commit: 445d1e26c9a222061b43b9cb5a637358039270b9
Parents: 603af35
Author: Aditya Toomula <[email protected]>
Authored: Mon Apr 30 16:51:37 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon Apr 30 16:51:37 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java | 65 +++++----------
 .../SamzaSqlRelRecordSerdeFactory.java          | 67 +++++++++++++++
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  |  3 +-
 .../samza/sql/TestSamzaSqlRelRecordSerde.java   | 87 ++++++++++++++++++++
 4 files changed, 175 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index f121983..c9c30cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -102,14 +102,7 @@ public class AvroRelConverter implements SamzaRelConverter 
{
       throw new SamzaException(msg);
     }
 
-    Object key = samzaMessage.getKey();
-    if (key != null && key instanceof IndexedRecord) {
-      IndexedRecord keyRecord = (IndexedRecord) key;
-      Schema keySchema = keyRecord.getSchema();
-      key = convertToJavaObject(samzaMessage.getKey(), keySchema);
-    }
-
-    return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, 
fieldValues);
   }
 
   private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
@@ -169,17 +162,11 @@ public class AvroRelConverter implements 
SamzaRelConverter {
       case RECORD:
         return convertToGenericRecord((SamzaSqlRelRecord) relObj, 
getNonNullUnionSchema(schema));
       case ARRAY:
-        if (((List<Object>) relObj).size() == 0) {
-          return null;
-        }
         List<Object> avroList = ((List<Object>) relObj).stream()
             .map(o -> convertToAvroObject(o, 
getNonNullUnionSchema(schema).getElementType()))
             .collect(Collectors.toList());
         return avroList;
       case MAP:
-        if (((Map<String, ?>) relObj).size() == 0) {
-          return null;
-        }
         return ((Map<String, ?>) relObj).entrySet()
             .stream()
             .collect(Collectors.toMap(Map.Entry::getKey, e -> 
convertToAvroObject(e.getValue(),
@@ -198,53 +185,41 @@ public class AvroRelConverter implements 
SamzaRelConverter {
   // Not doing any validations of data types with Avro schema considering the 
resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
   public Object convertToJavaObject(Object avroObj, Schema schema) {
+    if (avroObj == null) {
+      return null;
+    }
     switch(schema.getType()) {
       case RECORD:
-        if (avroObj == null) {
-          return null;
-        }
         return convertToRelRecord((IndexedRecord) avroObj);
       case ARRAY: {
         ArrayList<Object> retVal = new ArrayList<>();
-        if (avroObj != null) {
-          List<Object> avroArray;
-          if (avroObj instanceof GenericData.Array) {
-            avroArray = (GenericData.Array) avroObj;
-          } else if (avroObj instanceof List) {
-            avroArray = (List) avroObj;
-          } else {
-            throw new SamzaException("Unsupported array type " + 
avroObj.getClass().getSimpleName());
-          }
-
-          if (avroArray != null) {
-            retVal.addAll(
-                avroArray.stream()
-                    .map(v -> convertToJavaObject(v, 
getNonNullUnionSchema(schema).getElementType()))
-                    .collect(Collectors.toList()));
-          }
+        List<Object> avroArray;
+        if (avroObj instanceof GenericData.Array) {
+          avroArray = (GenericData.Array) avroObj;
+        } else if (avroObj instanceof List) {
+          avroArray = (List) avroObj;
+        } else {
+          throw new SamzaException("Unsupported array type " + 
avroObj.getClass().getSimpleName());
         }
+
+        retVal.addAll(
+            avroArray.stream()
+                .map(v -> convertToJavaObject(v, 
getNonNullUnionSchema(schema).getElementType()))
+                .collect(Collectors.toList()));
         return retVal;
       }
       case MAP: {
         Map<String, Object> retVal = new HashMap<>();
-        if (avroObj != null) {
-          retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream()
-              .collect(Collectors.toMap(
-                  Map.Entry::getKey,
-                  e -> convertToJavaObject(e.getValue(), 
getNonNullUnionSchema(schema).getValueType()))));
-        }
+        retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream()
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                e -> convertToJavaObject(e.getValue(), 
getNonNullUnionSchema(schema).getValueType()))));
         return retVal;
       }
       case UNION:
-        if (avroObj == null) {
-          return null;
-        }
         return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
       case ENUM:
       case FIXED:
-        if (avroObj == null) {
-          return null;
-        }
         return avroObj.toString();
 
       default:

http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
new file mode 100644
index 0000000..8a22047
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.serializers;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+
+/**
+ * A serializer for {@link SamzaSqlRelMessage.SamzaSqlRelRecord}. This 
serializer preserves the type information as
+ * {@link SamzaSqlRelMessage.SamzaSqlRelRecord} and contains nested {@link 
SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * records.
+ */
+public final class SamzaSqlRelRecordSerdeFactory implements 
SerdeFactory<SamzaSqlRelMessage.SamzaSqlRelRecord> {
+  public Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> getSerde(String name, 
Config config) {
+    return new SamzaSqlRelRecordSerde();
+  }
+
+  public final static class SamzaSqlRelRecordSerde implements 
Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> {
+
+    @Override
+    public SamzaSqlRelMessage.SamzaSqlRelRecord fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        // Enable object typing to handle nested records
+        mapper.enableDefaultTyping();
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<SamzaSqlRelMessage.SamzaSqlRelRecord>() {});
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(SamzaSqlRelMessage.SamzaSqlRelRecord p) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        // Enable object typing to handle nested records
+        mapper.enableDefaultTyping();
+        return mapper.writeValueAsString(p).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
index 94695c4..381a3cb 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -75,7 +75,6 @@ public class TestSamzaSqlRelMessageSerde {
     SamzaSqlRelMessageSerde serde =
         (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessage resultMsg = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey()));
-    nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
     KV<Object, Object> samzaMessage = 
nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
     GenericRecord recordPostConversion = (GenericRecord) 
samzaMessage.getValue();
 
@@ -85,7 +84,7 @@ public class TestSamzaSqlRelMessageSerde {
     }
   }
 
-  private Pair<SamzaSqlRelMessage, GenericData.Record> 
createNestedSamzaSqlRelMessage(
+  public static Pair<SamzaSqlRelMessage, GenericData.Record> 
createNestedSamzaSqlRelMessage(
       AvroRelConverter nestedRecordAvroRelConverter) {
     GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
     record.put("id", 1);

http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
new file mode 100644
index 0000000..95b4972
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
@@ -0,0 +1,87 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
+import org.apache.samza.system.SystemStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javafx.util.Pair;
+
+import static 
org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
+
+
+public class TestSamzaSqlRelRecordSerde {
+
+  private List<Object> values = Arrays.asList("value1", 1, null);
+  private List<String> names = Arrays.asList("field1", "field2", "field3");
+
+  @Test
+  public void testWithDifferentFields() {
+    SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, 
values).getSamzaSqlRelRecord();
+    SamzaSqlRelRecordSerde serde =
+        (SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+    SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record));
+    Assert.assertEquals(names, resultRecord.getFieldNames());
+    Assert.assertEquals(values, resultRecord.getFieldValues());
+  }
+
+  @Test
+  public void testNestedRecordConversion() {
+    Map<String, String> props = new HashMap<>();
+    SystemStream ss1 = new SystemStream("test", "nestedRecord");
+    props.put(
+        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
+        Profile.SCHEMA$.toString());
+    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
+    AvroRelSchemaProvider nestedRecordSchemaProvider =
+        (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
+    AvroRelConverter nestedRecordAvroRelConverter =
+        new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig());
+
+    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
+        
TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
+    SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde =
+        (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+    SamzaSqlRelRecord resultRecord = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord()));
+    GenericData.Record recordPostConversion =
+        (GenericData.Record) 
nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$);
+
+    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
+      // equals() on GenericRecord does the nested record equality check as 
well.
+      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
+    }
+  }
+}
\ No newline at end of file

Reply via email to