Repository: samza Updated Branches: refs/heads/master 603af35a5 -> 445d1e26c
SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for Avro and Rel conversion. Adding Serde for rel record, as calcite expects the keys to be in string format. Rel converters are always expected to provide keys as strings. If key is an avro record, it is expected that the rel converter changes the avro record to rel record and serializes it and deserializes it when conerting rel message to samza message. Author: Aditya Toomula <[email protected]> Reviewers: Srini P<[email protected]> Closes #495 from atoomula/rel1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/445d1e26 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/445d1e26 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/445d1e26 Branch: refs/heads/master Commit: 445d1e26c9a222061b43b9cb5a637358039270b9 Parents: 603af35 Author: Aditya Toomula <[email protected]> Authored: Mon Apr 30 16:51:37 2018 -0700 Committer: Jagadish <[email protected]> Committed: Mon Apr 30 16:51:37 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/sql/avro/AvroRelConverter.java | 65 +++++---------- .../SamzaSqlRelRecordSerdeFactory.java | 67 +++++++++++++++ .../samza/sql/TestSamzaSqlRelMessageSerde.java | 3 +- .../samza/sql/TestSamzaSqlRelRecordSerde.java | 87 ++++++++++++++++++++ 4 files changed, 175 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index f121983..c9c30cc 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -102,14 +102,7 @@ public class AvroRelConverter implements SamzaRelConverter { throw new SamzaException(msg); } - Object key = samzaMessage.getKey(); - if (key != null && key instanceof IndexedRecord) { - IndexedRecord keyRecord = (IndexedRecord) key; - Schema keySchema = keyRecord.getSchema(); - key = convertToJavaObject(samzaMessage.getKey(), keySchema); - } - - return new SamzaSqlRelMessage(key, fieldNames, fieldValues); + return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues); } private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) { @@ -169,17 +162,11 @@ public class AvroRelConverter implements SamzaRelConverter { case RECORD: return convertToGenericRecord((SamzaSqlRelRecord) relObj, getNonNullUnionSchema(schema)); case ARRAY: - if (((List<Object>) relObj).size() == 0) { - return null; - } List<Object> avroList = ((List<Object>) relObj).stream() .map(o -> convertToAvroObject(o, getNonNullUnionSchema(schema).getElementType())) .collect(Collectors.toList()); return avroList; case MAP: - if (((Map<String, ?>) relObj).size() == 0) { - return null; - } return ((Map<String, ?>) relObj).entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(), @@ -198,53 +185,41 @@ public class AvroRelConverter implements SamzaRelConverter { // Not doing any validations of data types with Avro schema considering the resource cost per message. // Casting would fail if the data types are not in sync with the schema. public Object convertToJavaObject(Object avroObj, Schema schema) { + if (avroObj == null) { + return null; + } switch(schema.getType()) { case RECORD: - if (avroObj == null) { - return null; - } return convertToRelRecord((IndexedRecord) avroObj); case ARRAY: { ArrayList<Object> retVal = new ArrayList<>(); - if (avroObj != null) { - List<Object> avroArray; - if (avroObj instanceof GenericData.Array) { - avroArray = (GenericData.Array) avroObj; - } else if (avroObj instanceof List) { - avroArray = (List) avroObj; - } else { - throw new SamzaException("Unsupported array type " + avroObj.getClass().getSimpleName()); - } - - if (avroArray != null) { - retVal.addAll( - avroArray.stream() - .map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType())) - .collect(Collectors.toList())); - } + List<Object> avroArray; + if (avroObj instanceof GenericData.Array) { + avroArray = (GenericData.Array) avroObj; + } else if (avroObj instanceof List) { + avroArray = (List) avroObj; + } else { + throw new SamzaException("Unsupported array type " + avroObj.getClass().getSimpleName()); } + + retVal.addAll( + avroArray.stream() + .map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType())) + .collect(Collectors.toList())); return retVal; } case MAP: { Map<String, Object> retVal = new HashMap<>(); - if (avroObj != null) { - retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> convertToJavaObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())))); - } + retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> convertToJavaObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())))); return retVal; } case UNION: - if (avroObj == null) { - return null; - } return convertToJavaObject(avroObj, getNonNullUnionSchema(schema)); case ENUM: case FIXED: - if (avroObj == null) { - return null; - } return avroObj.toString(); default: http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java new file mode 100644 index 0000000..8a22047 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java @@ -0,0 +1,67 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.serializers; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + + +/** + * A serializer for {@link SamzaSqlRelMessage.SamzaSqlRelRecord}. This serializer preserves the type information as + * {@link SamzaSqlRelMessage.SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelMessage.SamzaSqlRelRecord} + * records. + */ +public final class SamzaSqlRelRecordSerdeFactory implements SerdeFactory<SamzaSqlRelMessage.SamzaSqlRelRecord> { + public Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> getSerde(String name, Config config) { + return new SamzaSqlRelRecordSerde(); + } + + public final static class SamzaSqlRelRecordSerde implements Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> { + + @Override + public SamzaSqlRelMessage.SamzaSqlRelRecord fromBytes(byte[] bytes) { + try { + ObjectMapper mapper = new ObjectMapper(); + // Enable object typing to handle nested records + mapper.enableDefaultTyping(); + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelMessage.SamzaSqlRelRecord>() {}); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public byte[] toBytes(SamzaSqlRelMessage.SamzaSqlRelRecord p) { + try { + ObjectMapper mapper = new ObjectMapper(); + // Enable object typing to handle nested records + mapper.enableDefaultTyping(); + return mapper.writeValueAsString(p).getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java index 94695c4..381a3cb 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -75,7 +75,6 @@ public class TestSamzaSqlRelMessageSerde { SamzaSqlRelMessageSerde serde = (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(messageRecordPair.getKey())); - nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); @@ -85,7 +84,7 @@ public class TestSamzaSqlRelMessageSerde { } } - private Pair<SamzaSqlRelMessage, GenericData.Record> createNestedSamzaSqlRelMessage( + public static Pair<SamzaSqlRelMessage, GenericData.Record> createNestedSamzaSqlRelMessage( AvroRelConverter nestedRecordAvroRelConverter) { GenericData.Record record = new GenericData.Record(Profile.SCHEMA$); record.put("id", 1); http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java new file mode 100644 index 0000000..95b4972 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java @@ -0,0 +1,87 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.avro.AvroRelConverter; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; +import org.apache.samza.sql.avro.schemas.Profile; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; +import org.apache.samza.system.SystemStream; +import org.junit.Assert; +import org.junit.Test; + +import javafx.util.Pair; + +import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord; + + +public class TestSamzaSqlRelRecordSerde { + + private List<Object> values = Arrays.asList("value1", 1, null); + private List<String> names = Arrays.asList("field1", "field2", "field3"); + + @Test + public void testWithDifferentFields() { + SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, values).getSamzaSqlRelRecord(); + SamzaSqlRelRecordSerde serde = + (SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record)); + Assert.assertEquals(names, resultRecord.getFieldNames()); + Assert.assertEquals(values, resultRecord.getFieldValues()); + } + + @Test + public void testNestedRecordConversion() { + Map<String, String> props = new HashMap<>(); + SystemStream ss1 = new SystemStream("test", "nestedRecord"); + props.put( + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), + Profile.SCHEMA$.toString()); + ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); + AvroRelSchemaProvider nestedRecordSchemaProvider = + (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); + AvroRelConverter nestedRecordAvroRelConverter = + new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); + + Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = + TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord())); + GenericData.Record recordPostConversion = + (GenericData.Record) nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$); + + for (Schema.Field field : Profile.SCHEMA$.getFields()) { + // equals() on GenericRecord does the nested record equality check as well. + Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); + } + } +} \ No newline at end of file
