[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a51971 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a51971 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a51971 Branch: refs/heads/master Commit: f3a519712fb31f7b71181e876c3c3d5fff08eb71 Parents: 7667ddc Author: Stephan Ewen <[email protected]> Authored: Tue Mar 20 17:16:06 2018 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 22 15:46:09 2018 +0100 ---------------------------------------------------------------------- .../state/AggregatingStateDescriptor.java | 31 --------- .../common/state/FoldingStateDescriptor.java | 31 --------- .../api/common/state/ListStateDescriptor.java | 30 --------- .../api/common/state/MapStateDescriptor.java | 29 -------- .../common/state/ReducingStateDescriptor.java | 30 --------- .../flink/api/common/state/StateDescriptor.java | 17 ++++- .../api/common/state/ValueStateDescriptor.java | 31 --------- .../common/state/ListStateDescriptorTest.java | 28 ++++++++ .../common/state/MapStateDescriptorTest.java | 29 ++++++++ .../state/ReducingStateDescriptorTest.java | 29 ++++++++ .../api/common/state/StateDescriptorTest.java | 69 ++++++++++++++++++-- .../common/state/ValueStateDescriptorTest.java | 28 ++++++++ 12 files changed, 193 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java index 6f6d2f9..8c7fed6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java @@ -111,35 +111,4 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag public Type getType() { return Type.AGGREGATING; } - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o != null && getClass() == o.getClass()) { - AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o; - return serializer.equals(that.serializer) && name.equals(that.name); - } - else { - return false; - } - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "AggregatingStateDescriptor{" + - "serializer=" + serializer + - ", aggFunction=" + aggFunction + - '}'; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index 261d1fe..c14e4bf 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -112,37 +112,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FoldingStateDescriptor<?, ?> that = (FoldingStateDescriptor<?, ?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "FoldingStateDescriptor{" + - "serializer=" + serializer + - ", initialValue=" + defaultValue + - ", foldFunction=" + foldFunction + - '}'; - } - - @Override public Type getType() { return Type.FOLDING; } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java index 38e5680..aa5e64b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -102,34 +102,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T public Type getType() { return Type.LIST; } - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o; - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "ListStateDescriptor{" + - "serializer=" + serializer + - '}'; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java index 087cb54..42b016a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java @@ -117,33 +117,4 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV> return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer(); } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o; - return serializer.equals(that.serializer) && name.equals(that.name); - } - - @Override - public String toString() { - return "MapStateDescriptor{" + - "name=" + name + - ", serializer=" + serializer + - '}'; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java index ef483e2..0df1c2c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java @@ -98,36 +98,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T> } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "ReducingStateDescriptor{" + - "serializer=" + serializer + - ", reduceFunction=" + reduceFunction + - '}'; - } - - @Override public Type getType() { return Type.REDUCING; } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 574c836..9b6b51d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -273,10 +273,23 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl // ------------------------------------------------------------------------ @Override - public abstract int hashCode(); + public final int hashCode() { + return name.hashCode() + 31 * getClass().hashCode(); + } @Override - public abstract boolean equals(Object o); + public final boolean equals(Object o) { + if (o == this) { + return true; + } + else if (o != null && o.getClass() == this.getClass()) { + final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o; + return this.name.equals(that.name); + } + else { + return false; + } + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java index ef18d74..4d69d81 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -130,37 +130,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return "ValueStateDescriptor{" + - "name=" + name + - ", defaultValue=" + defaultValue + - ", serializer=" + serializer + - '}'; - } - - @Override public Type getType() { return Type.VALUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java index b934ee0..cb6f608 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.testutils.CommonTestUtils; @@ -62,6 +63,33 @@ public class ListStateDescriptorTest { assertEquals(serializer, copy.getElementSerializer()); } + @Test + public void testHashCodeEquals() throws Exception { + final String name = "testName"; + + ListStateDescriptor<String> original = new ListStateDescriptor<>(name, String.class); + ListStateDescriptor<String> same = new ListStateDescriptor<>(name, String.class); + ListStateDescriptor<String> sameBySerializer = new ListStateDescriptor<>(name, StringSerializer.INSTANCE); + + // test that hashCode() works on state descriptors with initialized and uninitialized serializers + assertEquals(original.hashCode(), same.hashCode()); + assertEquals(original.hashCode(), sameBySerializer.hashCode()); + + assertEquals(original, same); + assertEquals(original, sameBySerializer); + + // equality with a clone + ListStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original); + assertEquals(original, clone); + + // equality with an initialized + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, clone); + + original.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, same); + } + /** * FLINK-6775. * http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java index 4e64c0f..069d6c2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.testutils.CommonTestUtils; @@ -67,6 +68,34 @@ public class MapStateDescriptorTest { assertEquals(valueSerializer, copy.getValueSerializer()); } + @Test + public void testHashCodeEquals() throws Exception { + final String name = "testName"; + + MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class); + MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class); + MapStateDescriptor<String, String> sameBySerializer = + new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE); + + // test that hashCode() works on state descriptors with initialized and uninitialized serializers + assertEquals(original.hashCode(), same.hashCode()); + assertEquals(original.hashCode(), sameBySerializer.hashCode()); + + assertEquals(original, same); + assertEquals(original, sameBySerializer); + + // equality with a clone + MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original); + assertEquals(original, clone); + + // equality with an initialized + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, clone); + + original.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, same); + } + /** * FLINK-6775. * http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java index 5d9eba5..89aa1e6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; @@ -56,4 +57,32 @@ public class ReducingStateDescriptorTest extends TestLogger { assertNotNull(copy.getSerializer()); assertEquals(serializer, copy.getSerializer()); } + + @Test + public void testHashCodeEquals() throws Exception { + final String name = "testName"; + final ReduceFunction<String> reducer = (a, b) -> a; + + ReducingStateDescriptor<String> original = new ReducingStateDescriptor<>(name, reducer, String.class); + ReducingStateDescriptor<String> same = new ReducingStateDescriptor<>(name, reducer, String.class); + ReducingStateDescriptor<String> sameBySerializer = new ReducingStateDescriptor<>(name, reducer, StringSerializer.INSTANCE); + + // test that hashCode() works on state descriptors with initialized and uninitialized serializers + assertEquals(original.hashCode(), same.hashCode()); + assertEquals(original.hashCode(), sameBySerializer.hashCode()); + + assertEquals(original, same); + assertEquals(original, sameBySerializer); + + // equality with a clone + ReducingStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original); + assertEquals(original, clone); + + // equality with an initialized + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, clone); + + original.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, same); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java index cf5327e..3958baa 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java @@ -32,6 +32,7 @@ import java.io.File; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; @@ -159,6 +160,47 @@ public class StateDescriptorTest { } // ------------------------------------------------------------------------ + // Test hashCode() and equals() + // ------------------------------------------------------------------------ + + @Test + public void testHashCodeAndEquals() throws Exception { + final String name = "testName"; + + TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class); + TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class); + TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE); + + // test that hashCode() works on state descriptors with initialized and uninitialized serializers + assertEquals(original.hashCode(), same.hashCode()); + assertEquals(original.hashCode(), sameBySerializer.hashCode()); + + assertEquals(original, same); + assertEquals(original, sameBySerializer); + + // equality with a clone + TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original); + assertEquals(original, clone); + + // equality with an initialized + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, clone); + + original.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, same); + } + + @Test + public void testEqualsSameNameAndTypeDifferentClass() throws Exception { + final String name = "test name"; + + final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class); + final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class); + + assertNotEquals(descr1, descr2); + } + + // ------------------------------------------------------------------------ // Mock implementations and test types // ------------------------------------------------------------------------ @@ -185,17 +227,34 @@ public class StateDescriptorTest { @Override public Type getType() { - throw new UnsupportedOperationException(); + return Type.VALUE; + } + } + + private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> { + + private static final long serialVersionUID = 1L; + + OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) { + super(name, serializer, null); + } + + OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); + } + + OtherTestStateDescriptor(String name, Class<T> type) { + super(name, type, null); } @Override - public int hashCode() { - return 584523; + public State bind(StateBinder stateBinder) throws Exception { + throw new UnsupportedOperationException(); } @Override - public boolean equals(Object o) { - return o != null && o.getClass() == TestStateDescriptor.class; + public Type getType() { + return Type.VALUE; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java index 67114e5..3870da0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.testutils.CommonTestUtils; @@ -36,6 +37,33 @@ import static org.junit.Assert.assertNotNull; public class ValueStateDescriptorTest extends TestLogger { @Test + public void testHashCodeEquals() throws Exception { + final String name = "testName"; + + ValueStateDescriptor<String> original = new ValueStateDescriptor<>(name, String.class); + ValueStateDescriptor<String> same = new ValueStateDescriptor<>(name, String.class); + ValueStateDescriptor<String> sameBySerializer = new ValueStateDescriptor<>(name, StringSerializer.INSTANCE); + + // test that hashCode() works on state descriptors with initialized and uninitialized serializers + assertEquals(original.hashCode(), same.hashCode()); + assertEquals(original.hashCode(), sameBySerializer.hashCode()); + + assertEquals(original, same); + assertEquals(original, sameBySerializer); + + // equality with a clone + ValueStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original); + assertEquals(original, clone); + + // equality with an initialized + clone.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, clone); + + original.initializeSerializerUnlessSet(new ExecutionConfig()); + assertEquals(original, same); + } + + @Test public void testVeryLargeDefaultValue() throws Exception { // ensure that we correctly read very large data when deserializing the default value
