Repository: samza Updated Branches: refs/heads/master 2a9e729ad -> af4ddc4a9
SAMZA-1740: Moving SamzaSqlRelRecord to samza-api as it is needed to be used in UDFs Please see description in the ticket. Also, implementing equals and hashCode methods for SamzaSqlRelRecord and SamzaSqlRelMessage. Author: Aditya Toomula <[email protected]> Reviewers: Srini P<[email protected]>, Jagadish <[email protected]> Closes #545 from atoomula/sql Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/af4ddc4a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/af4ddc4a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/af4ddc4a Branch: refs/heads/master Commit: af4ddc4a97c63ff167e33727ba94da91500b7f42 Parents: 2a9e729 Author: Aditya Toomula <[email protected]> Authored: Mon Jun 11 10:49:45 2018 -0700 Committer: Jagadish <[email protected]> Committed: Mon Jun 11 10:49:45 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/sql/SamzaSqlRelRecord.java | 113 +++++++++++++++++++ .../apache/samza/sql/TestSamzaSqlRelRecord.java | 43 +++++++ .../apache/samza/sql/avro/AvroRelConverter.java | 3 +- .../samza/sql/data/SamzaSqlRelMessage.java | 85 +++----------- .../SamzaSqlRelMessageSerdeFactory.java | 2 +- .../SamzaSqlRelRecordSerdeFactory.java | 18 +-- .../samza/sql/TestSamzaSqlRelMessageSerde.java | 102 ----------------- .../samza/sql/TestSamzaSqlRelRecordSerde.java | 86 -------------- .../samza/sql/data/TestSamzaSqlRelMessage.java | 18 +++ .../TestSamzaSqlRelMessageSerde.java | 102 +++++++++++++++++ .../serializers/TestSamzaSqlRelRecordSerde.java | 85 ++++++++++++++ 11 files changed, 389 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java new file mode 100644 index 0000000..e17a273 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java @@ -0,0 +1,113 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.samza.annotation.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * Samza sql relational record. A record consists of list of column values and the associated column names. + * A column value could be nested, meaning, it could be another SamzaSqlRelRecord. + * Right now we do not store any metadata (like nullability, etc) other than the column name in the SamzaSqlRelRecord. + */ [email protected] +public class SamzaSqlRelRecord implements Serializable { + + @JsonProperty("fieldNames") + private final ArrayList<String> fieldNames; + @JsonProperty("fieldValues") + private final ArrayList<Object> fieldValues; + + /** + * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values. + * @param fieldNames Ordered list of field names in the row. + * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null. This could be + * result of delete change capture event in the stream or because of the result of the outer + * join or the fields themselves are null in the original stream. + */ + public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames, + @JsonProperty("fieldValues") List<Object> fieldValues) { + if (fieldNames.size() != fieldValues.size()) { + throw new IllegalArgumentException("Field Names and values are not of same length."); + } + + this.fieldNames = new ArrayList<>(); + this.fieldValues = new ArrayList<>(); + + this.fieldNames.addAll(fieldNames); + this.fieldValues.addAll(fieldValues); + } + + /** + * Get the field names of all the columns in the relational message. + * @return the field names of all columns. + */ + @JsonProperty("fieldNames") + public List<String> getFieldNames() { + return this.fieldNames; + } + + /** + * Get the field values of all the columns in the relational message. + * @return the field values of all columns. + */ + @JsonProperty("fieldValues") + public List<Object> getFieldValues() { + return this.fieldValues; + } + + /** + * Get the value of the field corresponding to the field name. + * @param name Name of the field. + * @return returns the value of the field. + */ + public Optional<Object> getField(String name) { + for (int index = 0; index < fieldNames.size(); index++) { + if (fieldNames.get(index).equals(name)) { + return Optional.ofNullable(fieldValues.get(index)); + } + } + + return Optional.empty(); + } + + @Override + public int hashCode() { + return Objects.hash(fieldNames, fieldValues); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj; + return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java b/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java new file mode 100644 index 0000000..ac27991 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql; + +import java.util.Arrays; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestSamzaSqlRelRecord { + @Test + public void testEquality() { + SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object")); + SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object")); + assertEquals(relRecord1, relRecord2); + assertEquals(relRecord1.hashCode(), relRecord2.hashCode()); + } + + @Test + public void testInEquality() { + SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object")); + SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, null)); + assertNotEquals(relRecord1, relRecord2); + assertNotEquals(relRecord1.hashCode(), relRecord2.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index c9c30cc..ed22cc5 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -32,14 +32,13 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.system.SystemStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord; - /** * This class converts a Samza Avro messages to Relational messages and vice versa. http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java index 9bf1870..3ebbb23 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java @@ -22,14 +22,9 @@ package org.apache.samza.sql.data; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; +import java.util.Objects; import org.apache.commons.lang.Validate; -import org.apache.samza.SamzaException; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.codehaus.jackson.annotate.JsonProperty; @@ -96,6 +91,7 @@ public class SamzaSqlRelMessage implements Serializable { /** * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}. + * @param samzaSqlRelRecord represents the rel record. */ public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord) { this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues()); @@ -110,67 +106,20 @@ public class SamzaSqlRelMessage implements Serializable { return key; } - /** - * Samza sql relational record. A record consists of list of column values and the associated column names. - * A column value could be nested, meaning, it could be another SamzaSqlRelRecord. - * Right now we do not store any metadata (like nullability, etc) other than the column name in the SamzaSqlRelRecord. - */ - public static class SamzaSqlRelRecord implements Serializable { - - @JsonProperty("fieldNames") - private final List<String> fieldNames; - @JsonProperty("fieldValues") - private final List<Object> fieldValues; - - /** - * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values. - * @param fieldNames Ordered list of field names in the row. - * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null. This could be - * result of delete change capture event in the stream or because of the result of the outer - * join or the fields themselves are null in the original stream. - */ - public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames, - @JsonProperty("fieldValues") List<Object> fieldValues) { - Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); - - this.fieldNames = new ArrayList<>(); - this.fieldValues = new ArrayList<>(); - - this.fieldNames.addAll(fieldNames); - this.fieldValues.addAll(fieldValues); - } - - /** - * Get the field names of all the columns in the relational message. - * @return the field names of all columns. - */ - @JsonProperty("fieldNames") - public List<String> getFieldNames() { - return this.fieldNames; - } - - /** - * Get the field values of all the columns in the relational message. - * @return the field values of all columns. - */ - @JsonProperty("fieldValues") - public List<Object> getFieldValues() { - return this.fieldValues; - } + @Override + public int hashCode() { + return Objects.hash(key, samzaSqlRelRecord); + } - /** - * Get the value of the field corresponding to the field name. - * @param name Name of the field. - * @return returns the value of the field. - */ - public Optional<Object> getField(String name) { - for (int index = 0; index < fieldNames.size(); index++) { - if (fieldNames.get(index).equals(name)) { - return Optional.ofNullable(fieldValues.get(index)); - } - } - - return Optional.empty(); - } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj; + return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord); } } http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java index 45542ca..c3906bd 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java @@ -30,7 +30,7 @@ import org.codehaus.jackson.type.TypeReference; /** * A serializer for {@link SamzaSqlRelMessage}. This serializer preserves the type information as - * {@link SamzaSqlRelMessage} contains nested {@link org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord} + * {@link SamzaSqlRelMessage} contains nested {@link org.apache.samza.sql.SamzaSqlRelRecord} * records. */ public final class SamzaSqlRelMessageSerdeFactory implements SerdeFactory<SamzaSqlRelMessage> { http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java index 8a22047..a78bcda 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java @@ -23,37 +23,37 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; /** - * A serializer for {@link SamzaSqlRelMessage.SamzaSqlRelRecord}. This serializer preserves the type information as - * {@link SamzaSqlRelMessage.SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelMessage.SamzaSqlRelRecord} + * A serializer for {@link SamzaSqlRelRecord}. This serializer preserves the type information as + * {@link SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelRecord} * records. */ -public final class SamzaSqlRelRecordSerdeFactory implements SerdeFactory<SamzaSqlRelMessage.SamzaSqlRelRecord> { - public Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> getSerde(String name, Config config) { +public final class SamzaSqlRelRecordSerdeFactory implements SerdeFactory<SamzaSqlRelRecord> { + public Serde<SamzaSqlRelRecord> getSerde(String name, Config config) { return new SamzaSqlRelRecordSerde(); } - public final static class SamzaSqlRelRecordSerde implements Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> { + public final static class SamzaSqlRelRecordSerde implements Serde<SamzaSqlRelRecord> { @Override - public SamzaSqlRelMessage.SamzaSqlRelRecord fromBytes(byte[] bytes) { + public SamzaSqlRelRecord fromBytes(byte[] bytes) { try { ObjectMapper mapper = new ObjectMapper(); // Enable object typing to handle nested records mapper.enableDefaultTyping(); - return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelMessage.SamzaSqlRelRecord>() {}); + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelRecord>() {}); } catch (Exception e) { throw new SamzaException(e); } } @Override - public byte[] toBytes(SamzaSqlRelMessage.SamzaSqlRelRecord p) { + public byte[] toBytes(SamzaSqlRelRecord p) { try { ObjectMapper mapper = new ObjectMapper(); // Enable object typing to handle nested records http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java deleted file mode 100644 index 14ca3f0..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ /dev/null @@ -1,102 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.KV; -import org.apache.samza.sql.avro.AvroRelConverter; -import org.apache.samza.sql.avro.AvroRelSchemaProvider; -import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; -import org.apache.samza.sql.avro.schemas.AddressRecord; -import org.apache.samza.sql.avro.schemas.Profile; -import org.apache.samza.sql.avro.schemas.StreetNumRecord; -import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; -import org.apache.samza.system.SystemStream; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde; - - -public class TestSamzaSqlRelMessageSerde { - - private List<Object> values = Arrays.asList("value1", 1, null); - private List<String> names = Arrays.asList("field1", "field2", "field3"); - - @Test - public void testWithDifferentFields() { - SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - SamzaSqlRelMessageSerde serde = - (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); - SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message)); - Assert.assertEquals(names, resultMsg.getSamzaSqlRelRecord().getFieldNames()); - Assert.assertEquals(values, resultMsg.getSamzaSqlRelRecord().getFieldValues()); - } - - @Test - public void testNestedRecordConversion() { - Map<String, String> props = new HashMap<>(); - SystemStream ss1 = new SystemStream("test", "nestedRecord"); - props.put( - String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), - Profile.SCHEMA$.toString()); - ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); - AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); - AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); - - Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = - createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); - SamzaSqlRelMessageSerde serde = - (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); - SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(messageRecordPair.getKey())); - KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); - GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); - - for (Schema.Field field : Profile.SCHEMA$.getFields()) { - // equals() on GenericRecord does the nested record equality check as well. - Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); - } - } - - public static Pair<SamzaSqlRelMessage, GenericData.Record> createNestedSamzaSqlRelMessage( - AvroRelConverter nestedRecordAvroRelConverter) { - GenericData.Record record = new GenericData.Record(Profile.SCHEMA$); - record.put("id", 1); - record.put("name", "name1"); - record.put("companyId", 0); - GenericData.Record addressRecord = new GenericData.Record(AddressRecord.SCHEMA$); - addressRecord.put("zip", 90000); - record.put("address", addressRecord); - GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); - streetNumRecord.put("number", 1200); - addressRecord.put("streetnum", streetNumRecord); - return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java deleted file mode 100644 index 25d1c77..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java +++ /dev/null @@ -1,86 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.samza.config.MapConfig; -import org.apache.samza.sql.avro.AvroRelConverter; -import org.apache.samza.sql.avro.AvroRelSchemaProvider; -import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; -import org.apache.samza.sql.avro.schemas.Profile; -import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; -import org.apache.samza.system.SystemStream; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde; -import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord; - - -public class TestSamzaSqlRelRecordSerde { - - private List<Object> values = Arrays.asList("value1", 1, null); - private List<String> names = Arrays.asList("field1", "field2", "field3"); - - @Test - public void testWithDifferentFields() { - SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, values).getSamzaSqlRelRecord(); - SamzaSqlRelRecordSerde serde = - (SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); - SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record)); - Assert.assertEquals(names, resultRecord.getFieldNames()); - Assert.assertEquals(values, resultRecord.getFieldValues()); - } - - @Test - public void testNestedRecordConversion() { - Map<String, String> props = new HashMap<>(); - SystemStream ss1 = new SystemStream("test", "nestedRecord"); - props.put( - String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), - Profile.SCHEMA$.toString()); - ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); - AvroRelSchemaProvider nestedRecordSchemaProvider = - (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); - AvroRelConverter nestedRecordAvroRelConverter = - new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); - - Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = - TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); - SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde = - (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); - SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord())); - GenericData.Record recordPostConversion = - (GenericData.Record) nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$); - - for (Schema.Field field : Profile.SCHEMA$.getFields()) { - // equals() on GenericRecord does the nested record equality check as well. - Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java index 93e6223..d0a2f59 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java @@ -43,4 +43,22 @@ public class TestSamzaSqlRelMessage { SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent()); } + + @Test + public void testEquality() { + SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values); + SamzaSqlRelMessage message2 = + new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value1", "value2")); + Assert.assertEquals(message1, message2); + Assert.assertEquals(message1.hashCode(), message2.hashCode()); + } + + @Test + public void testInEquality() { + SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values); + SamzaSqlRelMessage message2 = + new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value2", "value2")); + Assert.assertNotEquals(message1, message2); + Assert.assertNotEquals(message1.hashCode(), message2.hashCode()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java new file mode 100644 index 0000000..a159e2f --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java @@ -0,0 +1,102 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.serializers; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.sql.avro.AvroRelConverter; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; +import org.apache.samza.sql.avro.schemas.AddressRecord; +import org.apache.samza.sql.avro.schemas.Profile; +import org.apache.samza.sql.avro.schemas.StreetNumRecord; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; +import org.apache.samza.system.SystemStream; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde; + + +public class TestSamzaSqlRelMessageSerde { + + private List<Object> values = Arrays.asList("value1", 1, null); + private List<String> names = Arrays.asList("field1", "field2", "field3"); + + @Test + public void testWithDifferentFields() { + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + SamzaSqlRelMessageSerde serde = + (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message)); + Assert.assertEquals(names, resultMsg.getSamzaSqlRelRecord().getFieldNames()); + Assert.assertEquals(values, resultMsg.getSamzaSqlRelRecord().getFieldValues()); + } + + @Test + public void testNestedRecordConversion() { + Map<String, String> props = new HashMap<>(); + SystemStream ss1 = new SystemStream("test", "nestedRecord"); + props.put( + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), + Profile.SCHEMA$.toString()); + ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); + AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); + AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); + + Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = + createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); + SamzaSqlRelMessageSerde serde = + (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(messageRecordPair.getKey())); + KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); + GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); + + for (Schema.Field field : Profile.SCHEMA$.getFields()) { + // equals() on GenericRecord does the nested record equality check as well. + Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); + } + } + + public static Pair<SamzaSqlRelMessage, GenericData.Record> createNestedSamzaSqlRelMessage( + AvroRelConverter nestedRecordAvroRelConverter) { + GenericData.Record record = new GenericData.Record(Profile.SCHEMA$); + record.put("id", 1); + record.put("name", "name1"); + record.put("companyId", 0); + GenericData.Record addressRecord = new GenericData.Record(AddressRecord.SCHEMA$); + addressRecord.put("zip", 90000); + record.put("address", addressRecord); + GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); + streetNumRecord.put("number", 1200); + addressRecord.put("streetnum", streetNumRecord); + return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java new file mode 100644 index 0000000..d264f01 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.serializers; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.SamzaSqlRelRecord; +import org.apache.samza.sql.avro.AvroRelConverter; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; +import org.apache.samza.sql.avro.schemas.Profile; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.system.SystemStream; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde; + + +public class TestSamzaSqlRelRecordSerde { + + private List<Object> values = Arrays.asList("value1", 1, null); + private List<String> names = Arrays.asList("field1", "field2", "field3"); + + @Test + public void testWithDifferentFields() { + SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, values).getSamzaSqlRelRecord(); + SamzaSqlRelRecordSerde serde = + (SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record)); + Assert.assertEquals(names, resultRecord.getFieldNames()); + Assert.assertEquals(values, resultRecord.getFieldValues()); + } + + @Test + public void testNestedRecordConversion() { + Map<String, String> props = new HashMap<>(); + SystemStream ss1 = new SystemStream("test", "nestedRecord"); + props.put( + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), + Profile.SCHEMA$.toString()); + ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); + AvroRelSchemaProvider nestedRecordSchemaProvider = + (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); + AvroRelConverter nestedRecordAvroRelConverter = + new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); + + Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = + TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord())); + GenericData.Record recordPostConversion = + (GenericData.Record) nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$); + + for (Schema.Field field : Profile.SCHEMA$.getFields()) { + // equals() on GenericRecord does the nested record equality check as well. + Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); + } + } +} \ No newline at end of file
