This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0be231972d5d687924db22ae81d6a10049f7c43e
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Fri Oct 27 11:01:49 2023 +0200

    [FLINK-33341][state] Refactoring: consolidate equals/hashCode/toString for 
incremental state handle classes.
---
 .../state/AbstractIncrementalStateHandle.java      | 36 +++++++++++
 .../state/IncrementalLocalKeyedStateHandle.java    | 35 +----------
 .../state/IncrementalRemoteKeyedStateHandle.java   | 69 ++--------------------
 3 files changed, 43 insertions(+), 97 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
index 8c7ea74c33c..85f12329f57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractIncrementalStateHandle.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import javax.annotation.Nonnull;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 /** Abstract superclass for all {@link IncrementalKeyedStateHandle}. */
@@ -103,4 +104,39 @@ public abstract class AbstractIncrementalStateHandle 
implements IncrementalKeyed
                 ? null
                 : this;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AbstractIncrementalStateHandle that = (AbstractIncrementalStateHandle) 
o;
+        return Objects.equals(stateHandleId, that.stateHandleId);
+    }
+
+    @Override
+    public int hashCode() {
+        return stateHandleId.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "AbstractIncrementalStateHandle{"
+                + "checkpointId="
+                + checkpointId
+                + ", backendIdentifier="
+                + backendIdentifier
+                + ", keyGroupRange="
+                + keyGroupRange
+                + ", sharedState="
+                + sharedState
+                + ", metaStateHandle="
+                + metaStateHandle
+                + ", stateHandleId="
+                + stateHandleId
+                + '}';
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index f854c111c6e..ac457f00622 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -94,44 +94,13 @@ public class IncrementalLocalKeyedStateHandle extends 
AbstractIncrementalStateHa
         return directoryStateHandle.getStateSize() + 
metaStateHandle.getStateSize();
     }
 
-    @Override
-    public int hashCode() {
-        int result = directoryStateHandle.hashCode();
-        result = 31 * result + getKeyGroupRange().hashCode();
-        result = 31 * result + getMetaDataStateHandle().hashCode();
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-
-        IncrementalLocalKeyedStateHandle that = 
(IncrementalLocalKeyedStateHandle) o;
-
-        return getKeyGroupRange().equals(that.keyGroupRange)
-                && 
getMetaDataStateHandle().equals(that.getMetaDataStateHandle());
-    }
-
     @Override
     public String toString() {
         return "IncrementalLocalKeyedStateHandle{"
-                + "metaDataState="
-                + metaStateHandle
-                + "} "
-                + "DirectoryKeyedStateHandle{"
                 + "directoryStateHandle="
                 + directoryStateHandle
-                + ", keyGroupRange="
-                + keyGroupRange
-                + '}';
+                + "} "
+                + super.toString();
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
index 86a5b59c168..41b9a0466cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -306,75 +306,16 @@ public class IncrementalRemoteKeyedStateHandle extends 
AbstractIncrementalStateH
                 stateHandleId);
     }
 
-    /**
-     * This method is should only be called in tests! This should never serve 
as key in a hash map.
-     */
-    @VisibleForTesting
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        IncrementalRemoteKeyedStateHandle that = 
(IncrementalRemoteKeyedStateHandle) o;
-
-        if (!getStateHandleId().equals(that.getStateHandleId())) {
-            return false;
-        }
-        if (getCheckpointId() != that.getCheckpointId()) {
-            return false;
-        }
-        if (!getBackendIdentifier().equals(that.getBackendIdentifier())) {
-            return false;
-        }
-        if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
-            return false;
-        }
-        if (!getSharedState().equals(that.getSharedState())) {
-            return false;
-        }
-        if (!getPrivateState().equals(that.getPrivateState())) {
-            return false;
-        }
-        return getMetaDataStateHandle().equals(that.getMetaDataStateHandle());
-    }
-
-    /** This method should only be called in tests! This should never serve as 
key in a hash map. */
-    @VisibleForTesting
-    @Override
-    public int hashCode() {
-        int result = getBackendIdentifier().hashCode();
-        result = 31 * result + getKeyGroupRange().hashCode();
-        result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() 
>>> 32));
-        result = 31 * result + getSharedState().hashCode();
-        result = 31 * result + getPrivateState().hashCode();
-        result = 31 * result + getMetaDataStateHandle().hashCode();
-        result = 31 * result + getStateHandleId().hashCode();
-        return result;
-    }
-
     @Override
     public String toString() {
         return "IncrementalRemoteKeyedStateHandle{"
-                + "backendIdentifier="
-                + backendIdentifier
-                + ", stateHandleId="
-                + stateHandleId
-                + ", keyGroupRange="
-                + keyGroupRange
-                + ", checkpointId="
-                + checkpointId
-                + ", sharedState="
-                + sharedState
-                + ", privateState="
+                + "privateState="
                 + privateState
-                + ", metaStateHandle="
-                + metaStateHandle
+                + ", persistedSizeOfThisCheckpoint="
+                + persistedSizeOfThisCheckpoint
                 + ", registered="
                 + (sharedStateRegistry != null)
-                + '}';
+                + "} "
+                + super.toString();
     }
 }

Reply via email to