[
https://issues.apache.org/jira/browse/BEAM-5866?focusedWorklogId=167208&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167208
]
ASF GitHub Bot logged work on BEAM-5866:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Nov/18 19:47
Start Date: 17/Nov/18 19:47
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #6845: [BEAM-5866] Fix
`Row#equals`
URL: https://github.com/apache/beam/pull/6845
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index b078f742fe9..77761967f03 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -346,13 +346,126 @@ public boolean equals(Object o) {
return false;
}
Row other = (Row) o;
- return Objects.equals(getSchema(), other.getSchema())
- && Objects.deepEquals(getValues().toArray(),
other.getValues().toArray());
+
+ if (!Objects.equals(getSchema(), other.getSchema())) {
+ return false;
+ }
+
+ for (int i = 0; i < getFieldCount(); i++) {
+ if (!Equals.deepEquals(getValue(i), other.getValue(i),
getSchema().getField(i).getType())) {
+ return false;
+ }
+ }
+
+ return true;
}
@Override
public int hashCode() {
- return Arrays.deepHashCode(new Object[] {getSchema(),
getValues().toArray()});
+ int h = 1;
+ for (int i = 0; i < getFieldCount(); i++) {
+ h = 31 * h + Equals.deepHashCode(getValue(i),
getSchema().getField(i).getType());
+ }
+
+ return h;
+ }
+
+ static class Equals {
+ static boolean deepEquals(Object a, Object b, Schema.FieldType fieldType) {
+ if (fieldType.getTypeName() == Schema.TypeName.BYTES) {
+ return Arrays.equals((byte[]) a, (byte[]) b);
+ } else if (fieldType.getTypeName() == Schema.TypeName.ARRAY) {
+ return deepEqualsForList(
+ (List<Object>) a, (List<Object>) b,
fieldType.getCollectionElementType());
+ } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
+ return deepEqualsForMap(
+ (Map<Object, Object>) a, (Map<Object, Object>) b,
fieldType.getMapValueType());
+ } else {
+ return Objects.equals(a, b);
+ }
+ }
+
+ static int deepHashCode(Object a, Schema.FieldType fieldType) {
+ if (fieldType.getTypeName() == Schema.TypeName.BYTES) {
+ return Arrays.hashCode((byte[]) a);
+ } else if (fieldType.getTypeName() == Schema.TypeName.ARRAY) {
+ return deepHashCodeForList((List<Object>) a,
fieldType.getCollectionElementType());
+ } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
+ return deepHashCodeForMap(
+ (Map<Object, Object>) a, fieldType.getMapKeyType(),
fieldType.getMapValueType());
+ } else {
+ return Objects.hashCode(a);
+ }
+ }
+
+ static <K, V> boolean deepEqualsForMap(Map<K, V> a, Map<K, V> b,
Schema.FieldType valueType) {
+ if (a == b) {
+ return true;
+ }
+
+ if (a.size() != b.size()) {
+ return false;
+ }
+
+ for (Map.Entry<K, V> e : a.entrySet()) {
+ K key = e.getKey();
+ V value = e.getValue();
+ V otherValue = b.get(key);
+
+ if (value == null) {
+ if (otherValue != null || !b.containsKey(key)) {
+ return false;
+ }
+ } else {
+ if (!deepEquals(value, otherValue, valueType)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ static int deepHashCodeForMap(
+ Map<Object, Object> a, Schema.FieldType keyType, Schema.FieldType
valueType) {
+ int h = 0;
+
+ for (Map.Entry<Object, Object> e : a.entrySet()) {
+ Object key = e.getKey();
+ Object value = e.getValue();
+
+ h += deepHashCode(key, keyType) ^ deepHashCode(value, valueType);
+ }
+
+ return h;
+ }
+
+ static boolean deepEqualsForList(List<Object> a, List<Object> b,
Schema.FieldType elementType) {
+ if (a == b) {
+ return true;
+ }
+
+ if (a.size() != b.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < a.size(); i++) {
+ if (!deepEquals(a.get(i), b.get(i), elementType)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ static int deepHashCodeForList(List<Object> a, Schema.FieldType
elementType) {
+ int h = 1;
+ for (int i = 0; i < a.size(); i++) {
+ h = 31 * h + deepHashCode(a.get(i), elementType);
+ }
+
+ return h;
+ }
}
@Override
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
index b212d7af3b4..536fee17d15 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
@@ -17,32 +17,26 @@
*/
package org.apache.beam.sdk.coders.org.apache.beam.sdk.coders;
-import static org.junit.Assert.assertEquals;
-
import com.google.common.collect.Lists;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.junit.Assume;
+import org.junit.Ignore;
import org.junit.Test;
/** Unit tests for {@link RowCoder}. */
public class RowCoderTest {
- void checkEncodeDecode(Row row) throws IOException {
- RowCoder coder = RowCoder.of(row.getSchema());
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- coder.encode(row, out);
- assertEquals(row, coder.decode(new
ByteArrayInputStream(out.toByteArray())));
- }
-
@Test
public void testPrimitiveTypes() throws Exception {
Schema schema =
@@ -66,7 +60,8 @@ public void testPrimitiveTypes() throws Exception {
.addValues(
(byte) 0, (short) 1, 2, 3L, new BigDecimal(2.3), 1.2f, 3.0d,
"str", dateTime, false)
.build();
- checkEncodeDecode(row);
+
+ CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row);
}
@Test
@@ -77,14 +72,16 @@ public void testNestedTypes() throws Exception {
Row nestedRow = Row.withSchema(nestedSchema).addValues(18,
"foobar").build();
Row row = Row.withSchema(schema).addValues(42, nestedRow).build();
- checkEncodeDecode(row);
+
+ CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row);
}
@Test
public void testArrays() throws Exception {
Schema schema = Schema.builder().addArrayField("f_array",
FieldType.STRING).build();
Row row = Row.withSchema(schema).addArray("one", "two", "three",
"four").build();
- checkEncodeDecode(row);
+
+ CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row);
}
@Test
@@ -99,7 +96,8 @@ public void testArrayOfRow() throws Exception {
Row.withSchema(nestedSchema).addValues(2, "two").build(),
Row.withSchema(nestedSchema).addValues(3, "three").build())
.build();
- checkEncodeDecode(row);
+
+ CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row);
}
@Test
@@ -113,7 +111,8 @@ public void testArrayOfArray() throws Exception {
Lists.newArrayList(5, 6, 7, 8),
Lists.newArrayList(9, 10, 11, 12))
.build();
- checkEncodeDecode(row);
+
+ CoderProperties.coderDecodeEncodeEqual(RowCoder.of(schema), row);
}
@Test(expected = NonDeterministicException.class)
@@ -145,4 +144,70 @@ public void testVerifyDeterministicNestedRow() throws
NonDeterministicException
coder.verifyDeterministic();
}
+
+ @Test
+ public void testConsistentWithEqualsBytesField() throws Exception {
+ Schema schema = Schema.of(Schema.Field.of("f1", FieldType.BYTES));
+ Row row1 = Row.withSchema(schema).addValue(new byte[] {1, 2, 3,
4}).build();
+ Row row2 = Row.withSchema(schema).addValue(new byte[] {1, 2, 3,
4}).build();
+ RowCoder coder = RowCoder.of(schema);
+
+ Assume.assumeTrue(coder.consistentWithEquals());
+
+ CoderProperties.coderConsistentWithEquals(coder, row1, row2);
+ }
+
+ @Test
+ @Ignore
+ public void testConsistentWithEqualsMapWithBytesKeyField() throws Exception {
+ FieldType fieldType = FieldType.map(FieldType.BYTES, FieldType.INT32);
+ Schema schema = Schema.of(Schema.Field.of("f1", fieldType));
+ RowCoder coder = RowCoder.of(schema);
+
+ Map<byte[], Integer> map1 = Collections.singletonMap(new byte[] {1, 2, 3,
4}, 1);
+ Row row1 = Row.withSchema(schema).addValue(map1).build();
+
+ Map<byte[], Integer> map2 = Collections.singletonMap(new byte[] {1, 2, 3,
4}, 1);
+ Row row2 = Row.withSchema(schema).addValue(map2).build();
+
+ Assume.assumeTrue(coder.consistentWithEquals());
+
+ CoderProperties.coderConsistentWithEquals(coder, row1, row2);
+ }
+
+ @Test
+ public void testConsistentWithEqualsArrayOfBytes() throws Exception {
+ FieldType fieldType = FieldType.array(FieldType.BYTES);
+ Schema schema = Schema.of(Schema.Field.of("f1", fieldType));
+ RowCoder coder = RowCoder.of(schema);
+
+ List<byte[]> list1 = Collections.singletonList(new byte[] {1, 2, 3, 4});
+ Row row1 = Row.withSchema(schema).addValue(list1).build();
+
+ List<byte[]> list2 = Collections.singletonList(new byte[] {1, 2, 3, 4});
+ Row row2 = Row.withSchema(schema).addValue(list2).build();
+
+ Assume.assumeTrue(coder.consistentWithEquals());
+
+ CoderProperties.coderConsistentWithEquals(coder, row1, row2);
+ }
+
+ @Test
+ public void testConsistentWithEqualsArrayOfArrayOfBytes() throws Exception {
+ FieldType fieldType = FieldType.array(FieldType.array(FieldType.BYTES));
+ Schema schema = Schema.of(Schema.Field.of("f1", fieldType));
+ RowCoder coder = RowCoder.of(schema);
+
+ List<byte[]> innerList1 = Collections.singletonList(new byte[] {1, 2, 3,
4});
+ List<List<byte[]>> list1 = Collections.singletonList(innerList1);
+ Row row1 = Row.withSchema(schema).addValue(list1).build();
+
+ List<byte[]> innerList2 = Collections.singletonList(new byte[] {1, 2, 3,
4});
+ List<List<byte[]>> list2 = Collections.singletonList(innerList2);
+ Row row2 = Row.withSchema(schema).addValue(list2).build();
+
+ Assume.assumeTrue(coder.consistentWithEquals());
+
+ CoderProperties.coderConsistentWithEquals(coder, row1, row2);
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 167208)
Time Spent: 7h 20m (was: 7h 10m)
> RowCoder doesn't implement structuralValue
> ------------------------------------------
>
> Key: BEAM-5866
> URL: https://issues.apache.org/jira/browse/BEAM-5866
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Reporter: Gleb Kanterov
> Assignee: Gleb Kanterov
> Priority: Major
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> These two properties fail for RowCoder with `BYTES` field, or `Map<BYTES, ?>`
> field.
> {code}
> public static <T> void testConsistentWithEquals(Coder<T> coder, T example) {
> assumeTrue(coder.consistentWithEquals());
> byte[] bytes = encodeBytes(coder, example);
> // even if the coder is non-deterministic, if the encoded bytes match,
> // coder is consistent with equals, decoded values must be equal
> T out0 = decodeBytes(coder, bytes);
> T out1 = decodeBytes(coder, bytes);
> assertEquals("If the encoded bytes match, decoded values must be equal",
> out0, out1);
> assertEquals(
> "If two values are equal, their hash codes must be equal",
> out0.hashCode(),
> out1.hashCode());
> }
> public static <T> void testStructuralValueConsistentWithEquals(Coder<T>
> coder, T example) {
> byte[] bytes = encodeBytes(coder, example);
> // even if coder is non-deterministic, if the encoded bytes match,
> // structural values must be equal
> Object out0 = coder.structuralValue(decodeBytes(coder, bytes));
> Object out1 = coder.structuralValue(decodeBytes(coder, bytes));
> assertEquals("If the encoded bytes match, structural values must be
> equal", out0, out1);
> assertEquals(
> "If two values are equal, their hash codes must be equal",
> out0.hashCode(),
> out1.hashCode());
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)