zhuzhurk commented on a change in pull request #11873:
URL: https://github.com/apache/flink/pull/11873#discussion_r467667602



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts 
happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
        private static final long serialVersionUID = -1169683445778281344L;
 
+       private final ExecutionVertexID executionVertexID;
+       private final int attemptNumber;
+
+       /**
+        * Get a random execution attempt id.
+        */
        public ExecutionAttemptID() {
+               this(new ExecutionVertexID(), 0);
        }
 
-       public ExecutionAttemptID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
+       public ExecutionAttemptID(ExecutionVertexID executionVertexID, int 
attemptNumber) {
+               Preconditions.checkState(attemptNumber >= 0);
+               this.executionVertexID = 
Preconditions.checkNotNull(executionVertexID);
+               this.attemptNumber = attemptNumber;
        }
 
        public void writeTo(ByteBuf buf) {
-               buf.writeLong(this.lowerPart);
-               buf.writeLong(this.upperPart);
+               executionVertexID.writeTo(buf);
+               buf.writeInt(this.attemptNumber);
        }
 
        public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
-               long lower = buf.readLong();
-               long upper = buf.readLong();
-               return new ExecutionAttemptID(lower, upper);
+               final ExecutionVertexID executionVertexID = 
ExecutionVertexID.fromByteBuf(buf);
+               final int attemptNumber = buf.readInt();
+               return new ExecutionAttemptID(executionVertexID, attemptNumber);
+       }
+
+       @VisibleForTesting
+       public int getAttemptNumber() {
+               return attemptNumber;
+       }
+
+       @VisibleForTesting
+       public ExecutionVertexID getExecutionVertexID() {

Review comment:
       NIT: `getExecutionVertexId` would be better than `getExecutionVertexID`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts 
happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
        private static final long serialVersionUID = -1169683445778281344L;
 
+       private final ExecutionVertexID executionVertexID;
+       private final int attemptNumber;
+
+       /**
+        * Get a random execution attempt id.
+        */
        public ExecutionAttemptID() {
+               this(new ExecutionVertexID(), 0);
        }
 
-       public ExecutionAttemptID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
+       public ExecutionAttemptID(ExecutionVertexID executionVertexID, int 
attemptNumber) {
+               Preconditions.checkState(attemptNumber >= 0);
+               this.executionVertexID = 
Preconditions.checkNotNull(executionVertexID);
+               this.attemptNumber = attemptNumber;
        }
 
        public void writeTo(ByteBuf buf) {
-               buf.writeLong(this.lowerPart);
-               buf.writeLong(this.upperPart);
+               executionVertexID.writeTo(buf);
+               buf.writeInt(this.attemptNumber);
        }
 
        public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
-               long lower = buf.readLong();
-               long upper = buf.readLong();
-               return new ExecutionAttemptID(lower, upper);
+               final ExecutionVertexID executionVertexID = 
ExecutionVertexID.fromByteBuf(buf);
+               final int attemptNumber = buf.readInt();
+               return new ExecutionAttemptID(executionVertexID, attemptNumber);
+       }
+
+       @VisibleForTesting
+       public int getAttemptNumber() {
+               return attemptNumber;
+       }
+
+       @VisibleForTesting
+       public ExecutionVertexID getExecutionVertexID() {
+               return executionVertexID;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == getClass()) {
+                       ExecutionAttemptID that = (ExecutionAttemptID) obj;
+                       return 
that.executionVertexID.equals(this.executionVertexID)
+                               && that.attemptNumber == this.attemptNumber;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return this.executionVertexID.hashCode() ^ this.attemptNumber;

Review comment:
       I'd prefer `this.executionVertexID.hashCode() * 31 + this.attemptNumber`.
   This is because `attemptNumber  == 0` is very common and can result in the 
same hash code.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts 
happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
        private static final long serialVersionUID = -1169683445778281344L;
 
+       private final ExecutionVertexID executionVertexID;
+       private final int attemptNumber;
+
+       /**
+        * Get a random execution attempt id.
+        */
        public ExecutionAttemptID() {

Review comment:
       This constructor is for tests only. Could we introduce a test util 
method `ExecutionGraphTestUtils#createRandomExecutionAttemptId()`? The returned 
id consists of a random `ExecutionVertexId` as well as a random 
`attemptNumber`. I think it better suites the tests which wants a random 
`ExecutionAttemptID`. And we can also avoid to add a test-only code path in 
production.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts 
happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
        private static final long serialVersionUID = -1169683445778281344L;
 
+       private final ExecutionVertexID executionVertexID;

Review comment:
       NIT: `executionVertexId` would be better than `executionVertexID`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
##########
@@ -33,6 +35,10 @@
 
        private final int subtaskIndex;
 
+       public ExecutionVertexID() {

Review comment:
       Could we introduce a util method 
`ExecutionGraphTestUtils#createRandomExecutionVertexId()` to avoid pollute 
production code with test code? And make it true random with a random 
subtaskIndex as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to