This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6b69fbc46 [SPARK-41792][SHUFFLE] Fix DB update for push based
shuffle when newer shuffle merge is received
5d6b69fbc46 is described below
commit 5d6b69fbc46ea52852dbe86f144ab87495af64a8
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Sun Jan 1 13:41:35 2023 -0600
[SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer
shuffle merge is received
### What changes were proposed in this pull request?
Incorrect merge id is removed from the DB when a newer shuffle merge id is
received.
### Why are the changes needed?
Bug fix
### Does this PR introduce _any_ user-facing change?
No, fixes a corner case bug
### How was this patch tested?
Unit test updated
Closes #39316 from mridulm/SPARK-41792.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 18 ++++++++++--------
.../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++++++++++++++++++
.../spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +-
3 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 816d1082850..c3a2e9a883a 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -227,15 +227,15 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// Higher shuffleMergeId seen for the shuffle ID meaning new stage
attempt is being
// run for the shuffle ID. Close and clean up old shuffleMergeId
files,
// happens in the indeterminate stage retries
- AppAttemptShuffleMergeId appAttemptShuffleMergeId =
- new AppAttemptShuffleMergeId(
- appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
shuffleMergeId);
+ AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(appShuffleInfo.appId,
appShuffleInfo.attemptId,
+ shuffleId, latestShuffleMergeId);
logger.info("{}: creating a new shuffle merge metadata since
received " +
- "shuffleMergeId is higher than latest shuffleMergeId {}",
- appAttemptShuffleMergeId, latestShuffleMergeId);
+ "shuffleMergeId {} is higher than latest shuffleMergeId {}",
+ currrentAppAttemptShuffleMergeId, shuffleMergeId,
latestShuffleMergeId);
submitCleanupTask(() ->
- closeAndDeleteOutdatedPartitions(
- appAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+
closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId,
+ mergePartitionsInfo.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
} else {
// The request is for block with same shuffleMergeId as the latest
shuffleMergeId
@@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
} else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
- appAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
} else {
// This block covers:
// 1. finalization of determinate stage
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index eb2c1d9fa5c..6a595ee346d 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -28,7 +28,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
@@ -1146,11 +1148,14 @@ public class RemoteBlockPushResolverSuite {
@Test
public void testCleanupOlderShuffleMergeId() throws IOException,
InterruptedException {
Semaphore closed = new Semaphore(0);
+ List<RemoteBlockPushResolver.AppAttemptShuffleMergeId> removedIds =
+ new CopyOnWriteArrayList<>();
pushResolver = new RemoteBlockPushResolver(conf, null) {
@Override
void closeAndDeleteOutdatedPartitions(
AppAttemptShuffleMergeId appAttemptShuffleMergeId,
Map<Integer, AppShufflePartitionInfo> partitions) {
+ removedIds.add(appAttemptShuffleMergeId);
super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId,
partitions);
closed.release();
}
@@ -1167,6 +1172,10 @@ public class RemoteBlockPushResolverSuite {
RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
pushResolver.validateAndGetAppShuffleInfo(testApp);
closed.acquire();
+ assertEquals(1, removedIds.size());
+ // For the previous merge id
+ assertEquals(1, removedIds.iterator().next().shuffleMergeId);
+ removedIds.clear();
assertFalse("Data files on the disk should be cleaned up",
appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
assertFalse("Meta files on the disk should be cleaned up",
@@ -1186,6 +1195,9 @@ public class RemoteBlockPushResolverSuite {
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0));
closed.acquire();
+ assertEquals(1, removedIds.size());
+ assertEquals(2, removedIds.iterator().next().shuffleMergeId);
+ removedIds.clear();
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
stream3.onComplete(stream3.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 0, 3));
@@ -1196,6 +1208,9 @@ public class RemoteBlockPushResolverSuite {
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
closed.acquire();
+ assertEquals(1, removedIds.size());
+ assertEquals(3, removedIds.iterator().next().shuffleMergeId);
+ removedIds.clear();
// Do not finalize shuffleMergeId 4 can happen during stage cancellation.
stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
stream4.onComplete(stream4.getID());
@@ -1204,6 +1219,10 @@ public class RemoteBlockPushResolverSuite {
// but no blocks pushed for that shuffleMergeId
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 0, 5));
closed.acquire();
+ assertEquals(1, removedIds.size());
+ // For the previous merge id - here the cleanup is from
finalizeShuffleMerge
+ assertEquals(4, removedIds.iterator().next().shuffleMergeId);
+ removedIds.clear();
assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4
should be cleaned"
+ " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4
should be cleaned"
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 16fa4205692..075a21c399e 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends
SparkFunSuite with Matchers {
assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
mergeManager2, mergeManager2DB) == 1)
assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload(
- mergeManager2, mergeManager2DB) == 2)
+ mergeManager2, mergeManager2DB) == 1)
s2.stop()
// Yarn Shuffle service comes back up without custom mergeManager
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]