[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a51971
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a51971
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a51971

Branch: refs/heads/master
Commit: f3a519712fb31f7b71181e876c3c3d5fff08eb71
Parents: 7667ddc
Author: Stephan Ewen <[email protected]>
Authored: Tue Mar 20 17:16:06 2018 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Mar 22 15:46:09 2018 +0100

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptor.java       | 31 ---------
 .../common/state/FoldingStateDescriptor.java    | 31 ---------
 .../api/common/state/ListStateDescriptor.java   | 30 ---------
 .../api/common/state/MapStateDescriptor.java    | 29 --------
 .../common/state/ReducingStateDescriptor.java   | 30 ---------
 .../flink/api/common/state/StateDescriptor.java | 17 ++++-
 .../api/common/state/ValueStateDescriptor.java  | 31 ---------
 .../common/state/ListStateDescriptorTest.java   | 28 ++++++++
 .../common/state/MapStateDescriptorTest.java    | 29 ++++++++
 .../state/ReducingStateDescriptorTest.java      | 29 ++++++++
 .../api/common/state/StateDescriptorTest.java   | 69 ++++++++++++++++++--
 .../common/state/ValueStateDescriptorTest.java  | 28 ++++++++
 12 files changed, 193 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 6f6d2f9..8c7fed6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -111,35 +111,4 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> 
extends StateDescriptor<Ag
        public Type getType() {
                return Type.AGGREGATING;
        }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o != null && getClass() == o.getClass()) {
-                       AggregatingStateDescriptor<?, ?, ?> that = 
(AggregatingStateDescriptor<?, ?, ?>) o;
-                       return serializer.equals(that.serializer) && 
name.equals(that.name);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "AggregatingStateDescriptor{" +
-                               "serializer=" + serializer +
-                               ", aggFunction=" + aggFunction +
-                               '}';
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 261d1fe..c14e4bf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -112,37 +112,6 @@ public class FoldingStateDescriptor<T, ACC> extends 
StateDescriptor<FoldingState
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               FoldingStateDescriptor<?, ?> that = (FoldingStateDescriptor<?, 
?>) o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "FoldingStateDescriptor{" +
-                               "serializer=" + serializer +
-                               ", initialValue=" + defaultValue +
-                               ", foldFunction=" + foldFunction +
-                               '}';
-       }
-
-       @Override
        public Type getType() {
                return Type.FOLDING;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 38e5680..aa5e64b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -102,34 +102,4 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<ListState<T>, List<T
        public Type getType() {
                return Type.LIST;
        }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "ListStateDescriptor{" +
-                               "serializer=" + serializer +
-                               '}';
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 087cb54..42b016a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -117,33 +117,4 @@ public class MapStateDescriptor<UK, UV> extends 
StateDescriptor<MapState<UK, UV>
 
                return ((MapSerializer<UK, UV>) 
rawSerializer).getValueSerializer();
        }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-       }
-
-       @Override
-       public String toString() {
-               return "MapStateDescriptor{" +
-                               "name=" + name +
-                               ", serializer=" + serializer +
-                               '}';
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index ef483e2..0df1c2c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -98,36 +98,6 @@ public class ReducingStateDescriptor<T> extends 
StateDescriptor<ReducingState<T>
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) 
o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "ReducingStateDescriptor{" +
-                               "serializer=" + serializer +
-                               ", reduceFunction=" + reduceFunction +
-                               '}';
-       }
-
-       @Override
        public Type getType() {
                return Type.REDUCING;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 574c836..9b6b51d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -273,10 +273,23 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        // 
------------------------------------------------------------------------
 
        @Override
-       public abstract int hashCode();
+       public final int hashCode() {
+               return name.hashCode() + 31 * getClass().hashCode();
+       }
 
        @Override
-       public abstract boolean equals(Object o);
+       public final boolean equals(Object o) {
+               if (o == this) {
+                       return true;
+               }
+               else if (o != null && o.getClass() == this.getClass()) {
+                       final StateDescriptor<?, ?> that = (StateDescriptor<?, 
?>) o;
+                       return this.name.equals(that.name);
+               }
+               else {
+                       return false;
+               }
+       }
 
        @Override
        public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index ef18d74..4d69d81 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -130,37 +130,6 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<ValueState<T>, T> {
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "ValueStateDescriptor{" +
-                               "name=" + name +
-                               ", defaultValue=" + defaultValue +
-                               ", serializer=" + serializer +
-                               '}';
-       }
-
-       @Override
        public Type getType() {
                return Type.VALUE;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index b934ee0..cb6f608 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -62,6 +63,33 @@ public class ListStateDescriptorTest {
                assertEquals(serializer, copy.getElementSerializer());
        }
 
+       @Test
+       public void testHashCodeEquals() throws Exception {
+               final String name = "testName";
+
+               ListStateDescriptor<String> original = new 
ListStateDescriptor<>(name, String.class);
+               ListStateDescriptor<String> same = new 
ListStateDescriptor<>(name, String.class);
+               ListStateDescriptor<String> sameBySerializer = new 
ListStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+               // test that hashCode() works on state descriptors with 
initialized and uninitialized serializers
+               assertEquals(original.hashCode(), same.hashCode());
+               assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+               assertEquals(original, same);
+               assertEquals(original, sameBySerializer);
+
+               // equality with a clone
+               ListStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(original);
+               assertEquals(original, clone);
+
+               // equality with an initialized
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, clone);
+
+               original.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, same);
+       }
+
        /**
         * FLINK-6775.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 4e64c0f..069d6c2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -67,6 +68,34 @@ public class MapStateDescriptorTest {
                assertEquals(valueSerializer, copy.getValueSerializer());
        }
 
+       @Test
+       public void testHashCodeEquals() throws Exception {
+               final String name = "testName";
+
+               MapStateDescriptor<String, String> original = new 
MapStateDescriptor<>(name, String.class, String.class);
+               MapStateDescriptor<String, String> same = new 
MapStateDescriptor<>(name, String.class, String.class);
+               MapStateDescriptor<String, String> sameBySerializer =
+                               new MapStateDescriptor<>(name, 
StringSerializer.INSTANCE, StringSerializer.INSTANCE);
+
+               // test that hashCode() works on state descriptors with 
initialized and uninitialized serializers
+               assertEquals(original.hashCode(), same.hashCode());
+               assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+               assertEquals(original, same);
+               assertEquals(original, sameBySerializer);
+
+               // equality with a clone
+               MapStateDescriptor<String, String> clone = 
CommonTestUtils.createCopySerializable(original);
+               assertEquals(original, clone);
+
+               // equality with an initialized
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, clone);
+
+               original.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, same);
+       }
+
        /**
         * FLINK-6775.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 5d9eba5..89aa1e6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
@@ -56,4 +57,32 @@ public class ReducingStateDescriptorTest extends TestLogger {
                assertNotNull(copy.getSerializer());
                assertEquals(serializer, copy.getSerializer());
        }
+
+       @Test
+       public void testHashCodeEquals() throws Exception {
+               final String name = "testName";
+               final ReduceFunction<String> reducer = (a, b) -> a;
+
+               ReducingStateDescriptor<String> original = new 
ReducingStateDescriptor<>(name, reducer, String.class);
+               ReducingStateDescriptor<String> same = new 
ReducingStateDescriptor<>(name, reducer, String.class);
+               ReducingStateDescriptor<String> sameBySerializer = new 
ReducingStateDescriptor<>(name, reducer, StringSerializer.INSTANCE);
+
+               // test that hashCode() works on state descriptors with 
initialized and uninitialized serializers
+               assertEquals(original.hashCode(), same.hashCode());
+               assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+               assertEquals(original, same);
+               assertEquals(original, sameBySerializer);
+
+               // equality with a clone
+               ReducingStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(original);
+               assertEquals(original, clone);
+
+               // equality with an initialized
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, clone);
+
+               original.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, same);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index cf5327e..3958baa 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -32,6 +32,7 @@ import java.io.File;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
@@ -159,6 +160,47 @@ public class StateDescriptorTest {
        }
 
        // 
------------------------------------------------------------------------
+       //  Test hashCode() and equals()
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testHashCodeAndEquals() throws Exception {
+               final String name = "testName";
+
+               TestStateDescriptor<String> original = new 
TestStateDescriptor<>(name, String.class);
+               TestStateDescriptor<String> same = new 
TestStateDescriptor<>(name, String.class);
+               TestStateDescriptor<String> sameBySerializer = new 
TestStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+               // test that hashCode() works on state descriptors with 
initialized and uninitialized serializers
+               assertEquals(original.hashCode(), same.hashCode());
+               assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+               assertEquals(original, same);
+               assertEquals(original, sameBySerializer);
+
+               // equality with a clone
+               TestStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(original);
+               assertEquals(original, clone);
+
+               // equality with an initialized
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, clone);
+
+               original.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, same);
+       }
+
+       @Test
+       public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
+               final String name = "test name";
+
+               final TestStateDescriptor<String> descr1 = new 
TestStateDescriptor<>(name, String.class);
+               final OtherTestStateDescriptor<String> descr2 = new 
OtherTestStateDescriptor<>(name, String.class);
+
+               assertNotEquals(descr1, descr2);
+       }
+
+       // 
------------------------------------------------------------------------
        //  Mock implementations and test types
        // 
------------------------------------------------------------------------
 
@@ -185,17 +227,34 @@ public class StateDescriptorTest {
 
                @Override
                public Type getType() {
-                       throw new UnsupportedOperationException();
+                       return Type.VALUE;
+               }
+       }
+
+       private static class OtherTestStateDescriptor<T> extends 
StateDescriptor<State, T> {
+
+               private static final long serialVersionUID = 1L;
+
+               OtherTestStateDescriptor(String name, TypeSerializer<T> 
serializer) {
+                       super(name, serializer, null);
+               }
+
+               OtherTestStateDescriptor(String name, TypeInformation<T> 
typeInfo) {
+                       super(name, typeInfo, null);
+               }
+
+               OtherTestStateDescriptor(String name, Class<T> type) {
+                       super(name, type, null);
                }
 
                @Override
-               public int hashCode() {
-                       return 584523;
+               public State bind(StateBinder stateBinder) throws Exception {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               public boolean equals(Object o) {
-                       return o != null && o.getClass() == 
TestStateDescriptor.class;
+               public Type getType() {
+                       return Type.VALUE;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a51971/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 67114e5..3870da0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -36,6 +37,33 @@ import static org.junit.Assert.assertNotNull;
 public class ValueStateDescriptorTest extends TestLogger {
 
        @Test
+       public void testHashCodeEquals() throws Exception {
+               final String name = "testName";
+
+               ValueStateDescriptor<String> original = new 
ValueStateDescriptor<>(name, String.class);
+               ValueStateDescriptor<String> same = new 
ValueStateDescriptor<>(name, String.class);
+               ValueStateDescriptor<String> sameBySerializer = new 
ValueStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+               // test that hashCode() works on state descriptors with 
initialized and uninitialized serializers
+               assertEquals(original.hashCode(), same.hashCode());
+               assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+               assertEquals(original, same);
+               assertEquals(original, sameBySerializer);
+
+               // equality with a clone
+               ValueStateDescriptor<String> clone = 
CommonTestUtils.createCopySerializable(original);
+               assertEquals(original, clone);
+
+               // equality with an initialized
+               clone.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, clone);
+
+               original.initializeSerializerUnlessSet(new ExecutionConfig());
+               assertEquals(original, same);
+       }
+
+       @Test
        public void testVeryLargeDefaultValue() throws Exception {
                // ensure that we correctly read very large data when 
deserializing the default value
 

Reply via email to