Repository: samza
Updated Branches:
  refs/heads/master 2a9e729ad -> af4ddc4a9


SAMZA-1740: Moving SamzaSqlRelRecord to samza-api as it is needed to be used in 
UDFs

Please see description in the ticket. Also, implementing equals and hashCode 
methods for SamzaSqlRelRecord and SamzaSqlRelMessage.

Author: Aditya Toomula <[email protected]>

Reviewers: Srini P<[email protected]>, Jagadish <[email protected]>

Closes #545 from atoomula/sql


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/af4ddc4a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/af4ddc4a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/af4ddc4a

Branch: refs/heads/master
Commit: af4ddc4a97c63ff167e33727ba94da91500b7f42
Parents: 2a9e729
Author: Aditya Toomula <[email protected]>
Authored: Mon Jun 11 10:49:45 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon Jun 11 10:49:45 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/sql/SamzaSqlRelRecord.java | 113 +++++++++++++++++++
 .../apache/samza/sql/TestSamzaSqlRelRecord.java |  43 +++++++
 .../apache/samza/sql/avro/AvroRelConverter.java |   3 +-
 .../samza/sql/data/SamzaSqlRelMessage.java      |  85 +++-----------
 .../SamzaSqlRelMessageSerdeFactory.java         |   2 +-
 .../SamzaSqlRelRecordSerdeFactory.java          |  18 +--
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  | 102 -----------------
 .../samza/sql/TestSamzaSqlRelRecordSerde.java   |  86 --------------
 .../samza/sql/data/TestSamzaSqlRelMessage.java  |  18 +++
 .../TestSamzaSqlRelMessageSerde.java            | 102 +++++++++++++++++
 .../serializers/TestSamzaSqlRelRecordSerde.java |  85 ++++++++++++++
 11 files changed, 389 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java 
b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
new file mode 100644
index 0000000..e17a273
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -0,0 +1,113 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.samza.annotation.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Samza sql relational record. A record consists of list of column values and 
the associated column names.
+ * A column value could be nested, meaning, it could be another 
SamzaSqlRelRecord.
+ * Right now we do not store any metadata (like nullability, etc) other than 
the column name in the SamzaSqlRelRecord.
+ */
[email protected]
+public class SamzaSqlRelRecord implements Serializable {
+
+  @JsonProperty("fieldNames")
+  private final ArrayList<String> fieldNames;
+  @JsonProperty("fieldValues")
+  private final ArrayList<Object> fieldValues;
+
+  /**
+   * Creates a {@link SamzaSqlRelRecord} from the list of relational fields 
and values.
+   * @param fieldNames Ordered list of field names in the row.
+   * @param fieldValues  Ordered list of all the values in the row. Some of 
the fields can be null. This could be
+   *                     result of delete change capture event in the stream 
or because of the result of the outer
+   *                     join or the fields themselves are null in the 
original stream.
+   */
+  public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames,
+      @JsonProperty("fieldValues") List<Object> fieldValues) {
+    if (fieldNames.size() != fieldValues.size()) {
+      throw new IllegalArgumentException("Field Names and values are not of 
same length.");
+    }
+
+    this.fieldNames = new ArrayList<>();
+    this.fieldValues = new ArrayList<>();
+
+    this.fieldNames.addAll(fieldNames);
+    this.fieldValues.addAll(fieldValues);
+  }
+
+  /**
+   * Get the field names of all the columns in the relational message.
+   * @return the field names of all columns.
+   */
+  @JsonProperty("fieldNames")
+  public List<String> getFieldNames() {
+    return this.fieldNames;
+  }
+
+  /**
+   * Get the field values of all the columns in the relational message.
+   * @return the field values of all columns.
+   */
+  @JsonProperty("fieldValues")
+  public List<Object> getFieldValues() {
+    return this.fieldValues;
+  }
+
+  /**
+   * Get the value of the field corresponding to the field name.
+   * @param name Name of the field.
+   * @return returns the value of the field.
+   */
+  public Optional<Object> getField(String name) {
+    for (int index = 0; index < fieldNames.size(); index++) {
+      if (fieldNames.get(index).equals(name)) {
+        return Optional.ofNullable(fieldValues.get(index));
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fieldNames, fieldValues);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj;
+    return Objects.equals(fieldNames, other.fieldNames) && 
Objects.equals(fieldValues, other.fieldValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java 
b/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java
new file mode 100644
index 0000000..ac27991
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestSamzaSqlRelRecord {
+  @Test
+  public void testEquality() {
+    SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", 
"name"), Arrays.asList(1L, "object"));
+    SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", 
"name"), Arrays.asList(1L, "object"));
+    assertEquals(relRecord1, relRecord2);
+    assertEquals(relRecord1.hashCode(), relRecord2.hashCode());
+  }
+
+  @Test
+  public void testInEquality() {
+    SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", 
"name"), Arrays.asList(1L, "object"));
+    SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", 
"name"), Arrays.asList(1L, null));
+    assertNotEquals(relRecord1, relRecord2);
+    assertNotEquals(relRecord1.hashCode(), relRecord2.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index c9c30cc..ed22cc5 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -32,14 +32,13 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
-
 
 /**
  * This class converts a Samza Avro messages to Relational messages and vice 
versa.

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 9bf1870..3ebbb23 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -22,14 +22,9 @@ package org.apache.samza.sql.data;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
+import java.util.Objects;
 import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 
@@ -96,6 +91,7 @@ public class SamzaSqlRelMessage implements Serializable {
 
   /**
    * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
+   * @param samzaSqlRelRecord represents the rel record.
    */
   public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") 
SamzaSqlRelRecord samzaSqlRelRecord) {
     this(samzaSqlRelRecord.getFieldNames(), 
samzaSqlRelRecord.getFieldValues());
@@ -110,67 +106,20 @@ public class SamzaSqlRelMessage implements Serializable {
     return key;
   }
 
-  /**
-   * Samza sql relational record. A record consists of list of column values 
and the associated column names.
-   * A column value could be nested, meaning, it could be another 
SamzaSqlRelRecord.
-   * Right now we do not store any metadata (like nullability, etc) other than 
the column name in the SamzaSqlRelRecord.
-   */
-  public static class SamzaSqlRelRecord implements Serializable {
-
-    @JsonProperty("fieldNames")
-    private final List<String> fieldNames;
-    @JsonProperty("fieldValues")
-    private final List<Object> fieldValues;
-
-    /**
-     * Creates a {@link SamzaSqlRelRecord} from the list of relational fields 
and values.
-     * @param fieldNames Ordered list of field names in the row.
-     * @param fieldValues  Ordered list of all the values in the row. Some of 
the fields can be null. This could be
-     *                     result of delete change capture event in the stream 
or because of the result of the outer
-     *                     join or the fields themselves are null in the 
original stream.
-     */
-    public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> 
fieldNames,
-        @JsonProperty("fieldValues") List<Object> fieldValues) {
-      Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names 
and values are not of same length.");
-
-      this.fieldNames = new ArrayList<>();
-      this.fieldValues = new ArrayList<>();
-
-      this.fieldNames.addAll(fieldNames);
-      this.fieldValues.addAll(fieldValues);
-    }
-
-    /**
-     * Get the field names of all the columns in the relational message.
-     * @return the field names of all columns.
-     */
-    @JsonProperty("fieldNames")
-    public List<String> getFieldNames() {
-      return this.fieldNames;
-    }
-
-    /**
-     * Get the field values of all the columns in the relational message.
-     * @return the field values of all columns.
-     */
-    @JsonProperty("fieldValues")
-    public List<Object> getFieldValues() {
-      return this.fieldValues;
-    }
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, samzaSqlRelRecord);
+  }
 
-    /**
-     * Get the value of the field corresponding to the field name.
-     * @param name Name of the field.
-     * @return returns the value of the field.
-     */
-    public Optional<Object> getField(String name) {
-      for (int index = 0; index < fieldNames.size(); index++) {
-        if (fieldNames.get(index).equals(name)) {
-          return Optional.ofNullable(fieldValues.get(index));
-        }
-      }
-
-      return Optional.empty();
-    }
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
+    return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, 
other.samzaSqlRelRecord);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
index 45542ca..c3906bd 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
@@ -30,7 +30,7 @@ import org.codehaus.jackson.type.TypeReference;
 
 /**
  * A serializer for {@link SamzaSqlRelMessage}. This serializer preserves the 
type information as
- * {@link SamzaSqlRelMessage} contains nested {@link 
org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * {@link SamzaSqlRelMessage} contains nested {@link 
org.apache.samza.sql.SamzaSqlRelRecord}
  * records.
  */
 public final class SamzaSqlRelMessageSerdeFactory implements 
SerdeFactory<SamzaSqlRelMessage> {

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
index 8a22047..a78bcda 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
@@ -23,37 +23,37 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 
 /**
- * A serializer for {@link SamzaSqlRelMessage.SamzaSqlRelRecord}. This 
serializer preserves the type information as
- * {@link SamzaSqlRelMessage.SamzaSqlRelRecord} and contains nested {@link 
SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * A serializer for {@link SamzaSqlRelRecord}. This serializer preserves the 
type information as
+ * {@link SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelRecord}
  * records.
  */
-public final class SamzaSqlRelRecordSerdeFactory implements 
SerdeFactory<SamzaSqlRelMessage.SamzaSqlRelRecord> {
-  public Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> getSerde(String name, 
Config config) {
+public final class SamzaSqlRelRecordSerdeFactory implements 
SerdeFactory<SamzaSqlRelRecord> {
+  public Serde<SamzaSqlRelRecord> getSerde(String name, Config config) {
     return new SamzaSqlRelRecordSerde();
   }
 
-  public final static class SamzaSqlRelRecordSerde implements 
Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> {
+  public final static class SamzaSqlRelRecordSerde implements 
Serde<SamzaSqlRelRecord> {
 
     @Override
-    public SamzaSqlRelMessage.SamzaSqlRelRecord fromBytes(byte[] bytes) {
+    public SamzaSqlRelRecord fromBytes(byte[] bytes) {
       try {
         ObjectMapper mapper = new ObjectMapper();
         // Enable object typing to handle nested records
         mapper.enableDefaultTyping();
-        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<SamzaSqlRelMessage.SamzaSqlRelRecord>() {});
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<SamzaSqlRelRecord>() {});
       } catch (Exception e) {
         throw new SamzaException(e);
       }
     }
 
     @Override
-    public byte[] toBytes(SamzaSqlRelMessage.SamzaSqlRelRecord p) {
+    public byte[] toBytes(SamzaSqlRelRecord p) {
       try {
         ObjectMapper mapper = new ObjectMapper();
         // Enable object typing to handle nested records

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
deleted file mode 100644
index 14ca3f0..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.KV;
-import org.apache.samza.sql.avro.AvroRelConverter;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
-import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
-import org.apache.samza.sql.avro.schemas.AddressRecord;
-import org.apache.samza.sql.avro.schemas.Profile;
-import org.apache.samza.sql.avro.schemas.StreetNumRecord;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.system.SystemStream;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static 
org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
-
-
-public class TestSamzaSqlRelMessageSerde {
-
-  private List<Object> values = Arrays.asList("value1", 1, null);
-  private List<String> names = Arrays.asList("field1", "field2", "field3");
-
-  @Test
-  public void testWithDifferentFields() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    SamzaSqlRelMessageSerde serde =
-        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-    SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));
-    Assert.assertEquals(names, 
resultMsg.getSamzaSqlRelRecord().getFieldNames());
-    Assert.assertEquals(values, 
resultMsg.getSamzaSqlRelRecord().getFieldValues());
-  }
-
-  @Test
-  public void testNestedRecordConversion() {
-    Map<String, String> props = new HashMap<>();
-    SystemStream ss1 = new SystemStream("test", "nestedRecord");
-    props.put(
-        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
-        Profile.SCHEMA$.toString());
-    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
-    AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) 
factory.create(ss1, new MapConfig(props));
-    AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, 
nestedRecordSchemaProvider, new MapConfig());
-
-    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
-        createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
-    SamzaSqlRelMessageSerde serde =
-        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-    SamzaSqlRelMessage resultMsg = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey()));
-    KV<Object, Object> samzaMessage = 
nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
-    GenericRecord recordPostConversion = (GenericRecord) 
samzaMessage.getValue();
-
-    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
-      // equals() on GenericRecord does the nested record equality check as 
well.
-      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
-    }
-  }
-
-  public static Pair<SamzaSqlRelMessage, GenericData.Record> 
createNestedSamzaSqlRelMessage(
-      AvroRelConverter nestedRecordAvroRelConverter) {
-    GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
-    record.put("id", 1);
-    record.put("name", "name1");
-    record.put("companyId", 0);
-    GenericData.Record addressRecord = new 
GenericData.Record(AddressRecord.SCHEMA$);
-    addressRecord.put("zip", 90000);
-    record.put("address", addressRecord);
-    GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
-    streetNumRecord.put("number", 1200);
-    addressRecord.put("streetnum", streetNumRecord);
-    return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
deleted file mode 100644
index 25d1c77..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.avro.AvroRelConverter;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
-import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
-import org.apache.samza.sql.avro.schemas.Profile;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
-import org.apache.samza.system.SystemStream;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static 
org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
-
-
-public class TestSamzaSqlRelRecordSerde {
-
-  private List<Object> values = Arrays.asList("value1", 1, null);
-  private List<String> names = Arrays.asList("field1", "field2", "field3");
-
-  @Test
-  public void testWithDifferentFields() {
-    SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, 
values).getSamzaSqlRelRecord();
-    SamzaSqlRelRecordSerde serde =
-        (SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
-    SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record));
-    Assert.assertEquals(names, resultRecord.getFieldNames());
-    Assert.assertEquals(values, resultRecord.getFieldValues());
-  }
-
-  @Test
-  public void testNestedRecordConversion() {
-    Map<String, String> props = new HashMap<>();
-    SystemStream ss1 = new SystemStream("test", "nestedRecord");
-    props.put(
-        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
-        Profile.SCHEMA$.toString());
-    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
-    AvroRelSchemaProvider nestedRecordSchemaProvider =
-        (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
-    AvroRelConverter nestedRecordAvroRelConverter =
-        new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig());
-
-    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
-        
TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
-    SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde =
-        (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
-    SamzaSqlRelRecord resultRecord = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord()));
-    GenericData.Record recordPostConversion =
-        (GenericData.Record) 
nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$);
-
-    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
-      // equals() on GenericRecord does the nested record equality check as 
well.
-      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java 
b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
index 93e6223..d0a2f59 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
@@ -43,4 +43,22 @@ public class TestSamzaSqlRelMessage {
     SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
     
Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
   }
+
+  @Test
+  public void testEquality() {
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message2 =
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), 
Arrays.asList("value1", "value2"));
+    Assert.assertEquals(message1, message2);
+    Assert.assertEquals(message1.hashCode(), message2.hashCode());
+  }
+
+  @Test
+  public void testInEquality() {
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message2 =
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), 
Arrays.asList("value2", "value2"));
+    Assert.assertNotEquals(message1, message2);
+    Assert.assertNotEquals(message1.hashCode(), message2.hashCode());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
new file mode 100644
index 0000000..a159e2f
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.serializers;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.AddressRecord;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.system.SystemStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
+
+
+public class TestSamzaSqlRelMessageSerde {
+
+  private List<Object> values = Arrays.asList("value1", 1, null);
+  private List<String> names = Arrays.asList("field1", "field2", "field3");
+
+  @Test
+  public void testWithDifferentFields() {
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessageSerde serde =
+        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+    SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));
+    Assert.assertEquals(names, 
resultMsg.getSamzaSqlRelRecord().getFieldNames());
+    Assert.assertEquals(values, 
resultMsg.getSamzaSqlRelRecord().getFieldValues());
+  }
+
+  @Test
+  public void testNestedRecordConversion() {
+    Map<String, String> props = new HashMap<>();
+    SystemStream ss1 = new SystemStream("test", "nestedRecord");
+    props.put(
+        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
+        Profile.SCHEMA$.toString());
+    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
+    AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) 
factory.create(ss1, new MapConfig(props));
+    AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, 
nestedRecordSchemaProvider, new MapConfig());
+
+    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
+        createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
+    SamzaSqlRelMessageSerde serde =
+        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+    SamzaSqlRelMessage resultMsg = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey()));
+    KV<Object, Object> samzaMessage = 
nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
+    GenericRecord recordPostConversion = (GenericRecord) 
samzaMessage.getValue();
+
+    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
+      // equals() on GenericRecord does the nested record equality check as 
well.
+      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
+    }
+  }
+
+  public static Pair<SamzaSqlRelMessage, GenericData.Record> 
createNestedSamzaSqlRelMessage(
+      AvroRelConverter nestedRecordAvroRelConverter) {
+    GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
+    record.put("id", 1);
+    record.put("name", "name1");
+    record.put("companyId", 0);
+    GenericData.Record addressRecord = new 
GenericData.Record(AddressRecord.SCHEMA$);
+    addressRecord.put("zip", 90000);
+    record.put("address", addressRecord);
+    GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
+    streetNumRecord.put("number", 1200);
+    addressRecord.put("streetnum", streetNumRecord);
+    return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/af4ddc4a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
new file mode 100644
index 0000000..d264f01
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.serializers;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.system.SystemStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
+
+
+public class TestSamzaSqlRelRecordSerde {
+
+  private List<Object> values = Arrays.asList("value1", 1, null);
+  private List<String> names = Arrays.asList("field1", "field2", "field3");
+
+  @Test
+  public void testWithDifferentFields() {
+    SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, 
values).getSamzaSqlRelRecord();
+    SamzaSqlRelRecordSerde serde =
+        (SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+    SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record));
+    Assert.assertEquals(names, resultRecord.getFieldNames());
+    Assert.assertEquals(values, resultRecord.getFieldValues());
+  }
+
+  @Test
+  public void testNestedRecordConversion() {
+    Map<String, String> props = new HashMap<>();
+    SystemStream ss1 = new SystemStream("test", "nestedRecord");
+    props.put(
+        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
+        Profile.SCHEMA$.toString());
+    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
+    AvroRelSchemaProvider nestedRecordSchemaProvider =
+        (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
+    AvroRelConverter nestedRecordAvroRelConverter =
+        new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig());
+
+    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
+        
TestSamzaSqlRelMessageSerde.createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
+    SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde serde =
+        (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+    SamzaSqlRelRecord resultRecord = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey().getSamzaSqlRelRecord()));
+    GenericData.Record recordPostConversion =
+        (GenericData.Record) 
nestedRecordAvroRelConverter.convertToAvroObject(resultRecord, Profile.SCHEMA$);
+
+    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
+      // equals() on GenericRecord does the nested record equality check as 
well.
+      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
+    }
+  }
+}
\ No newline at end of file

Reply via email to