This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new c77d37e5a TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality (#326) (Seonggon Namgung reviewed by Laszlo Bodor) c77d37e5a is described below commit c77d37e5ab45c883f0ad0d6e647540dcd9910c78 Author: seonggon <ln...@postech.ac.kr> AuthorDate: Wed Dec 25 17:44:56 2024 +0900 TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality (#326) (Seonggon Namgung reviewed by Laszlo Bodor) --- .../common/CompositeInputAttemptIdentifier.java | 11 ++++++++- .../library/common/InputAttemptIdentifier.java | 14 ++++++++++- .../shuffle/impl/ShuffleInputEventHandlerImpl.java | 1 + .../common/shuffle/impl/ShuffleManager.java | 27 +++++++++++++++++++--- .../shuffle/orderedgrouped/ShuffleScheduler.java | 14 ++++++++++- .../library/common/TestInputIdentifiers.java | 19 +++++++++++++++ 6 files changed, 80 insertions(+), 6 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java index 30295bd39..e07e68766 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common; +import com.google.common.collect.Range; import org.apache.hadoop.classification.InterfaceAudience.Private; /** @@ -50,6 +51,14 @@ public class CompositeInputAttemptIdentifier extends InputAttemptIdentifier { return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); } + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + Range<Integer> inputRange = + Range.closedOpen(super.getInputIdentifier(), super.getInputIdentifier() + inputIdentifierCount); + + return inputRange.contains(thatInputAttemptIdentifier.getInputIdentifier()) && + super.getAttemptNumber() == thatInputAttemptIdentifier.getAttemptNumber(); + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -63,6 +72,6 @@ public class CompositeInputAttemptIdentifier extends InputAttemptIdentifier { @Override public String toString() { - return super.toString(); + return super.toString() + ", count=" + inputIdentifierCount; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index 16172e1da..d1d5aeda1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -108,6 +108,18 @@ public class InputAttemptIdentifier { (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()); } + /** + * Checks whether this InputAttemptIdentifier includes the given InputAttemptIdentifier. + * It is used when we obsolete InputAttemptIdentifiers that include a FetchFailure reported one. + * + * @param thatInputAttemptIdentifier The InputAttemptIdentifier to check for inclusion. + * @return True if the current identifier includes the given one, false otherwise. + */ + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + return this.inputIdentifier == thatInputAttemptIdentifier.getInputIdentifier() && + this.attemptNumber == thatInputAttemptIdentifier.getAttemptNumber(); + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -139,6 +151,6 @@ public class InputAttemptIdentifier { public String toString() { return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier + ", attemptNumber=" + attemptNumber + ", pathComponent=" - + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]"; + + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId + "]"; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 4f42f57a1..56b8cd4a0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -282,6 +282,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private void processInputFailedEvent(InputFailedEvent ife) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); + LOG.info("Marking obsolete input: {} {}", inputContext.getSourceVertexName(), srcAttemptIdentifier); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 769ac68f7..646194c6d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -583,8 +583,9 @@ public class ShuffleManager implements FetcherCallback { } else { alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); } + // Avoid adding attempts which have already completed or have been marked as OBSOLETE - if (alreadyCompleted || obsoletedInputs.contains(input)) { + if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) { inputIter.remove(); continue; } @@ -949,10 +950,14 @@ public class ShuffleManager implements FetcherCallback { // TODO NEWTEZ. Implement logic to report fetch failures after a threshold. // For now, reporting immediately. InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier(); + if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) { + LOG.info("Do not report obsolete input: " + srcAttemptIdentifier); + return; + } LOG.info( - "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, " + "{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, " + "local fetch: {}, remote fetch failure reported as local failure: {})", - sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, + sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed, inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource()); failedShufflesCounter.increment(1); inputContext.notifyProgress(); @@ -984,6 +989,22 @@ public class ShuffleManager implements FetcherCallback { } } } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + if (input == null) { + return false; + } + InputAttemptIdentifier obsoleteInput; + Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoletedInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.includes(obsoleteInput)) { + return true; + } + } + return false; + } + /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index f68ab948b..3fc7d6305 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1175,7 +1175,19 @@ class ShuffleScheduler { } else { isInputFinished = isInputFinished(id.getInputIdentifier()); } - return !obsoleteInputs.contains(id) && !isInputFinished; + return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished; + } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + InputAttemptIdentifier obsoleteInput; + Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoleteInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.includes(obsoleteInput)) { + return true; + } + } + return false; } public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) { diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java index 6b82a9d27..5eb3b5030 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java @@ -41,4 +41,23 @@ public class TestInputIdentifiers { Assert.assertTrue(set.add(i4)); } + @Test(timeout = 5000) + public void testInputAttemptIdentifierIncludes() { + InputAttemptIdentifier inputData0Attempt0 = new InputAttemptIdentifier(0, 0); + InputAttemptIdentifier inputData1Attempt0 = new InputAttemptIdentifier(1, 0); + InputAttemptIdentifier inputData2Attempt0 = new InputAttemptIdentifier(2, 0); + InputAttemptIdentifier inputData3Attempt0 = new InputAttemptIdentifier(3, 0); + InputAttemptIdentifier inputData1Attempt1 = new InputAttemptIdentifier(1, 1); + CompositeInputAttemptIdentifier inputData12Attempt0 = new CompositeInputAttemptIdentifier(1, 0, null, 2); + + Assert.assertTrue(inputData1Attempt0.includes(inputData1Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData1Attempt1)); + + Assert.assertFalse(inputData12Attempt0.includes(inputData0Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData1Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData3Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData1Attempt1)); + } }