This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new c77d37e5a TEZ-4061: InputAttemptIdentifier and 
CompositeInputAttemptIdentifier cannot be compared for equality (#326) 
(Seonggon Namgung reviewed by Laszlo Bodor)
c77d37e5a is described below

commit c77d37e5ab45c883f0ad0d6e647540dcd9910c78
Author: seonggon <ln...@postech.ac.kr>
AuthorDate: Wed Dec 25 17:44:56 2024 +0900

    TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot 
be compared for equality (#326) (Seonggon Namgung reviewed by Laszlo Bodor)
---
 .../common/CompositeInputAttemptIdentifier.java    | 11 ++++++++-
 .../library/common/InputAttemptIdentifier.java     | 14 ++++++++++-
 .../shuffle/impl/ShuffleInputEventHandlerImpl.java |  1 +
 .../common/shuffle/impl/ShuffleManager.java        | 27 +++++++++++++++++++---
 .../shuffle/orderedgrouped/ShuffleScheduler.java   | 14 ++++++++++-
 .../library/common/TestInputIdentifiers.java       | 19 +++++++++++++++
 6 files changed, 80 insertions(+), 6 deletions(-)

diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
index 30295bd39..e07e68766 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.common;
 
+import com.google.common.collect.Range;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 
 /**
@@ -50,6 +51,14 @@ public class CompositeInputAttemptIdentifier extends 
InputAttemptIdentifier {
     return new InputAttemptIdentifier(getInputIdentifier() + 
inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), 
getFetchTypeInfo(), getSpillEventId());
   }
 
+  public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) {
+    Range<Integer> inputRange =
+        Range.closedOpen(super.getInputIdentifier(), 
super.getInputIdentifier() + inputIdentifierCount);
+
+    return 
inputRange.contains(thatInputAttemptIdentifier.getInputIdentifier()) &&
+        super.getAttemptNumber() == 
thatInputAttemptIdentifier.getAttemptNumber();
+  }
+
   // PathComponent & shared does not need to be part of the hashCode and 
equals computation.
   @Override
   public int hashCode() {
@@ -63,6 +72,6 @@ public class CompositeInputAttemptIdentifier extends 
InputAttemptIdentifier {
 
   @Override
   public String toString() {
-    return super.toString();
+    return super.toString() + ", count=" + inputIdentifierCount;
   }
 }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 16172e1da..d1d5aeda1 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -108,6 +108,18 @@ public class InputAttemptIdentifier {
         (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal());
   }
 
+  /**
+   * Checks whether this InputAttemptIdentifier includes the given 
InputAttemptIdentifier.
+   * It is used when we obsolete InputAttemptIdentifiers that include a 
FetchFailure reported one.
+   *
+   * @param thatInputAttemptIdentifier The InputAttemptIdentifier to check for 
inclusion.
+   * @return True if the current identifier includes the given one, false 
otherwise.
+   */
+  public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) {
+    return this.inputIdentifier == 
thatInputAttemptIdentifier.getInputIdentifier() &&
+        this.attemptNumber == thatInputAttemptIdentifier.getAttemptNumber();
+  }
+
   // PathComponent & shared does not need to be part of the hashCode and 
equals computation.
   @Override
   public int hashCode() {
@@ -139,6 +151,6 @@ public class InputAttemptIdentifier {
   public String toString() {
     return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
         + ", attemptNumber=" + attemptNumber + ", pathComponent="
-        + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + 
spillEventId  +"]";
+        + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + 
spillEventId + "]";
   }
 }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 4f42f57a1..56b8cd4a0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -282,6 +282,7 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
 
   private void processInputFailedEvent(InputFailedEvent ife) {
     InputAttemptIdentifier srcAttemptIdentifier = new 
InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+    LOG.info("Marking obsolete input: {} {}", 
inputContext.getSourceVertexName(), srcAttemptIdentifier);
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
   }
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 769ac68f7..646194c6d 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -583,8 +583,9 @@ public class ShuffleManager implements FetcherCallback {
       } else {
           alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
       }
+
       // Avoid adding attempts which have already completed or have been 
marked as OBSOLETE
-      if (alreadyCompleted || obsoletedInputs.contains(input)) {
+      if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) {
         inputIter.remove();
         continue;
       }
@@ -949,10 +950,14 @@ public class ShuffleManager implements FetcherCallback {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
     InputAttemptIdentifier srcAttemptIdentifier = 
inputAttemptFetchFailure.getInputAttemptIdentifier();
+    if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) {
+      LOG.info("Do not report obsolete input: " + srcAttemptIdentifier);
+      return;
+    }
     LOG.info(
-        "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
+        "{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, "
             + "local fetch: {}, remote fetch failure reported as local 
failure: {})",
-        sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, 
connectFailed,
+        sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed,
         inputAttemptFetchFailure.isLocalFetch(), 
inputAttemptFetchFailure.isDiskErrorAtSource());
     failedShufflesCounter.increment(1);
     inputContext.notifyProgress();
@@ -984,6 +989,22 @@ public class ShuffleManager implements FetcherCallback {
       }
     }
   }
+
+  private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier 
input) {
+    if (input == null) {
+      return false;
+    }
+    InputAttemptIdentifier obsoleteInput;
+    Iterator<InputAttemptIdentifier> obsoleteInputsIter = 
obsoletedInputs.iterator();
+    while (obsoleteInputsIter.hasNext()) {
+      obsoleteInput = obsoleteInputsIter.next();
+      if (input.includes(obsoleteInput)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /////////////////// End of Methods from FetcherCallbackHandler
 
   public void shutdown() throws InterruptedException {
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index f68ab948b..3fc7d6305 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1175,7 +1175,19 @@ class ShuffleScheduler {
     } else {
       isInputFinished = isInputFinished(id.getInputIdentifier());
     }
-    return !obsoleteInputs.contains(id) && !isInputFinished;
+    return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished;
+  }
+
+  private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier 
input) {
+    InputAttemptIdentifier obsoleteInput;
+    Iterator<InputAttemptIdentifier> obsoleteInputsIter = 
obsoleteInputs.iterator();
+    while (obsoleteInputsIter.hasNext()) {
+      obsoleteInput = obsoleteInputsIter.next();
+      if (input.includes(obsoleteInput)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost 
host) {
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
index 6b82a9d27..5eb3b5030 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
@@ -41,4 +41,23 @@ public class TestInputIdentifiers {
     Assert.assertTrue(set.add(i4));
   }
 
+  @Test(timeout = 5000)
+  public void testInputAttemptIdentifierIncludes() {
+    InputAttemptIdentifier inputData0Attempt0 = new InputAttemptIdentifier(0, 
0);
+    InputAttemptIdentifier inputData1Attempt0 = new InputAttemptIdentifier(1, 
0);
+    InputAttemptIdentifier inputData2Attempt0 = new InputAttemptIdentifier(2, 
0);
+    InputAttemptIdentifier inputData3Attempt0 = new InputAttemptIdentifier(3, 
0);
+    InputAttemptIdentifier inputData1Attempt1 = new InputAttemptIdentifier(1, 
1);
+    CompositeInputAttemptIdentifier inputData12Attempt0 = new 
CompositeInputAttemptIdentifier(1, 0, null, 2);
+
+    Assert.assertTrue(inputData1Attempt0.includes(inputData1Attempt0));
+    Assert.assertFalse(inputData1Attempt0.includes(inputData2Attempt0));
+    Assert.assertFalse(inputData1Attempt0.includes(inputData1Attempt1));
+
+    Assert.assertFalse(inputData12Attempt0.includes(inputData0Attempt0));
+    Assert.assertTrue(inputData12Attempt0.includes(inputData1Attempt0));
+    Assert.assertTrue(inputData12Attempt0.includes(inputData2Attempt0));
+    Assert.assertFalse(inputData12Attempt0.includes(inputData3Attempt0));
+    Assert.assertFalse(inputData12Attempt0.includes(inputData1Attempt1));
+  }
 }

Reply via email to