This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9afb407 [SPARK-37675][SPARK-37793] Prevent overwriting of push
shuffle merged files once the shuffle is finalized
9afb407 is described below
commit 9afb407fa7aaf2f0961661b5d8cfbec549e591ee
Author: Chandni Singh <[email protected]>
AuthorDate: Fri Jan 28 21:12:28 2022 -0600
[SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files
once the shuffle is finalized
### What changes were proposed in this pull request?
This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
- Empty merged partitions were reported by the shuffle server to the driver.
- The push merged files were getting overwritten after a shuffle merge is
finalized.
- Throwing exception in the finalization of a shuffle for which the shuffle
server didn't receive any blocks.
### Why are the changes needed?
Changes are need to fix the bug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Have added unit test.
Closes #35325 from otterc/SPARK-37675.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 171 ++++++++++-----------
.../shuffle/RemoteBlockPushResolverSuite.java | 77 +++++++++-
2 files changed, 158 insertions(+), 90 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d626cc3..b823076 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -83,20 +83,11 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
- // Shuffles of determinate stages will have shuffleMergeId set to 0
- private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER =
createErrorHandler();
// ByteBuffer to respond to client upon a successful merge of a pushed block
private static final ByteBuffer SUCCESS_RESPONSE =
new BlockPushReturnCode(ReturnCode.SUCCESS.id(),
"").toByteBuffer().asReadOnlyBuffer();
- // ConcurrentHashMap doesn't allow null for keys or values which is why this
is required.
- // Marker to identify finalized indeterminate shuffle partitions in the case
of indeterminate
- // stage retries.
- @VisibleForTesting
- public static final Map<Integer, AppShufflePartitionInfo>
INDETERMINATE_SHUFFLE_FINALIZED =
- Collections.emptyMap();
-
/**
* A concurrent hashmap where the key is the applicationId, and the value
includes
* all the merged shuffle information for this application. AppShuffleInfo
stores
@@ -169,59 +160,45 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
String blockId) throws BlockPushNonFatalFailure {
ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles =
appShuffleInfo.shuffles;
AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
- shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
- if (appShuffleMergePartitionsInfo == null) {
- File dataFile =
- appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
- // If this partition is already finalized then the partitions map
will not contain the
- // shuffleId for determinate stages but the data file would exist.
- // In that case the block is considered late. In the case of
indeterminate stages, most
- // recent shuffleMergeId finalized would be pointing to
INDETERMINATE_SHUFFLE_FINALIZED
- if (dataFile.exists()) {
- throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
- ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
- BlockPushNonFatalFailure.getErrorMsg(blockId,
ReturnCode.TOO_LATE_BLOCK_PUSH));
- } else {
- logger.info("Creating a new attempt for shuffle blocks push
request for shuffle {}"
- + " with shuffleMergeId {} for application {}_{}", shuffleId,
shuffleMergeId,
- appShuffleInfo.appId, appShuffleInfo.attemptId);
- return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
- }
+ shuffles.compute(shuffleId, (id, mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a
new shuffle " +
+ "merge metadata", appShuffleInfo.appId,
appShuffleInfo.attemptId, shuffleId,
+ shuffleMergeId);
+ return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
} else {
- // Reject the request as we have already seen a higher
shuffleMergeId than the
- // current incoming one
- int latestShuffleMergeId =
appShuffleMergePartitionsInfo.shuffleMergeId;
+ int latestShuffleMergeId = mergePartitionsInfo.shuffleMergeId;
if (latestShuffleMergeId > shuffleMergeId) {
+ // Reject the request as we have already seen a higher
shuffleMergeId than the one
+ // in the current request.
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(),
blockId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(blockId,
ReturnCode.STALE_BLOCK_PUSH));
- } else if (latestShuffleMergeId == shuffleMergeId) {
- return appShuffleMergePartitionsInfo;
- } else {
+ } else if (latestShuffleMergeId < shuffleMergeId){
// Higher shuffleMergeId seen for the shuffle ID meaning new stage
attempt is being
// run for the shuffle ID. Close and clean up old shuffleMergeId
files,
// happens in the indeterminate stage retries
- logger.info("Creating a new attempt for shuffle blocks push
request for shuffle {}"
- + " with shuffleMergeId {} for application {}_{} since it is
higher than the"
- + " latest shuffleMergeId {} already seen", shuffleId,
shuffleMergeId,
- appShuffleInfo.appId, appShuffleInfo.attemptId,
latestShuffleMergeId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a
new shuffle " +
+ "merge metadata since received shuffleMergeId is higher than
latest " +
+ "shuffleMergeId {}", appShuffleInfo.appId,
appShuffleInfo.attemptId, shuffleId,
+ shuffleMergeId, latestShuffleMergeId);
mergedShuffleCleaner.execute(() ->
-
closeAndDeletePartitionFiles(appShuffleMergePartitionsInfo.shuffleMergePartitions));
+
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
+ } else {
+ // The request is for block with same shuffleMergeId as the latest
shuffleMergeId
+ if (mergePartitionsInfo.isFinalized()) {
+ throw new BlockPushNonFatalFailure(
+ new BlockPushReturnCode(
+ ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
blockId).toByteBuffer(),
+ BlockPushNonFatalFailure.getErrorMsg(blockId,
ReturnCode.TOO_LATE_BLOCK_PUSH));
+ }
+ return mergePartitionsInfo;
}
}
});
-
- // It only gets here when the shuffle is already finalized.
- if (null == shufflePartitionsWithMergeId ||
- INDETERMINATE_SHUFFLE_FINALIZED ==
shufflePartitionsWithMergeId.shuffleMergePartitions) {
- throw new BlockPushNonFatalFailure(
- new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
blockId).toByteBuffer(),
- BlockPushNonFatalFailure.getErrorMsg(blockId,
ReturnCode.TOO_LATE_BLOCK_PUSH));
- }
-
Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
- shufflePartitionsWithMergeId.shuffleMergePartitions;
+ shufflePartitionsWithMergeId.shuffleMergePartitions;
return shuffleMergePartitions.computeIfAbsent(reduceId, key -> {
// It only gets here when the key is not present in the map. The first
time the merge
// manager receives a pushed block for a given application shuffle
partition.
@@ -235,9 +212,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId,
shuffleMergeId,
reduceId, dataFile, indexFile, metaFile);
} catch (IOException e) {
- logger.error(
- "Cannot create merged shuffle partition with data file {}, index
file {}, and "
- + "meta file {}", dataFile.getAbsolutePath(),
+ logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create
merged shuffle " +
+ "partition with data file {}, index file {}, and meta file {}",
appShuffleInfo.appId,
+ appShuffleInfo.attemptId, shuffleId, shuffleMergeId,
dataFile.getAbsolutePath(),
indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
throw new RuntimeException(
String.format("Cannot initialize merged shuffle partition for appId
%s shuffleId %s "
@@ -350,6 +327,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
* If cleanupLocalDirs is true, the merged shuffle files will also be
deleted.
* The cleanup will be executed in a separate thread.
*/
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@VisibleForTesting
void closeAndDeletePartitionFilesIfNeeded(
AppShuffleInfo appShuffleInfo,
@@ -512,10 +490,11 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
}
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
- logger.info("Finalizing shuffle {} with shuffleMergeId {} from Application
{}_{}.",
- msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalize shuffle
merge",
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// If finalizeShuffleMerge from a former application attempt, it is
considered late,
@@ -534,35 +513,33 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
new AtomicReference<>(null);
- // Metadata of the determinate stage shuffle can be safely removed as part
of finalizing
- // shuffle merge. Currently once the shuffle is finalized for a
determinate stages, retry
- // stages of the same shuffle will have shuffle push disabled.
- if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
- AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
- appShuffleInfo.shuffles.remove(msg.shuffleId);
- if (appShuffleMergePartitionsInfo != null) {
-
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
- }
- } else {
- appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
- if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
- INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (null != mergePartitionsInfo) {
+ if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId ||
+ mergePartitionsInfo.isFinalized()) {
throw new RuntimeException(String.format(
- "Shuffle merge finalize request for shuffle %s with" + "
shuffleMergeId %s is %s",
- msg.shuffleId, msg.shuffleMergeId,
- ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
- } else if (msg.shuffleMergeId > value.shuffleMergeId) {
+ "Shuffle merge finalize request for shuffle %s with" + "
shuffleMergeId %s is %s",
+ msg.shuffleId, msg.shuffleMergeId,
+
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+ } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId
then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
mergedShuffleCleaner.execute(() ->
- closeAndDeletePartitionFiles(value.shuffleMergePartitions));
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
} else {
- shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
- return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ // This block covers:
+ // 1. finalization of determinate stage
+ // 2. finalization of indeterminate stage if the shuffleMergeId
related to it is the one
+ // for which the message is received.
+
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
}
- });
- }
+ }
+ // Even when the mergePartitionsInfo is null, we mark the shuffle as
finalized but the results
+ // sent to the driver will be empty. This cam happen when the service
didn't receive any
+ // blocks for the shuffle yet and the driver didn't wait for enough time
to finalize the
+ // shuffle.
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ });
Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
shuffleMergePartitionsRef.get();
MergeStatuses mergeStatuses;
if (null == shuffleMergePartitions || shuffleMergePartitions.isEmpty()) {
@@ -576,14 +553,25 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
for (AppShufflePartitionInfo partition: shuffleMergePartitions.values())
{
synchronized (partition) {
try {
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing
shuffle " +
+ "partition {} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId);
// This can throw IOException which will marks this shuffle
partition as not merged.
partition.finalizePartition();
- bitmaps.add(partition.mapTracker);
- reduceIds.add(partition.reduceId);
- sizes.add(partition.getLastChunkOffset());
+ if (partition.mapTracker.getCardinality() > 0) {
+ bitmaps.add(partition.mapTracker);
+ reduceIds.add(partition.reduceId);
+ sizes.add(partition.getLastChunkOffset());
+ logger.debug("{} attempt {} shuffle {} shuffleMerge {}:
finalization results " +
+ "added for partition {} data size {} index size {} meta size
{}",
+ msg.appId, msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId,
partition.getLastChunkOffset(),
+ partition.indexFile.getPos(), partition.metaFile.getPos());
+ }
} catch (IOException ioe) {
- logger.warn("Exception while finalizing shuffle partition {}_{} {}
{}", msg.appId,
- msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
+ logger.warn("{} attempt {} shuffle {} shuffleMerge {}: exception
while " +
+ "finalizing shuffle partition {}", msg.appId,
msg.appAttemptId, msg.shuffleId,
+ msg.shuffleMergeId, partition.reduceId);
} finally {
partition.closeAllFilesAndDeleteIfNeeded(false);
}
@@ -593,8 +581,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- logger.info("Finalized shuffle {} with shuffleMergeId {} from Application
{}_{}.",
- msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+ logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of
shuffle merge completed",
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
return mergeStatuses;
}
@@ -808,7 +796,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
int reduceId) {
return null == appShuffleMergePartitionsInfo ||
- INDETERMINATE_SHUFFLE_FINALIZED ==
appShuffleMergePartitionsInfo.shuffleMergePartitions ||
+ appShuffleMergePartitionsInfo.isFinalized() ||
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
}
@@ -1008,20 +996,27 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
* required for the shuffles of indeterminate stages.
*/
public static class AppShuffleMergePartitionsInfo {
+ // ConcurrentHashMap doesn't allow null for keys or values which is why
this is required.
+ // Marker to identify finalized shuffle partitions.
+ private static final Map<Integer, AppShufflePartitionInfo>
SHUFFLE_FINALIZED_MARKER =
+ Collections.emptyMap();
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
- public AppShuffleMergePartitionsInfo(
- int shuffleMergeId, boolean shuffleFinalized) {
+ public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean
shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
- this.shuffleMergePartitions = shuffleFinalized ?
- INDETERMINATE_SHUFFLE_FINALIZED : new ConcurrentHashMap<>();
+ this.shuffleMergePartitions = shuffleFinalized ?
SHUFFLE_FINALIZED_MARKER :
+ new ConcurrentHashMap<>();
}
@VisibleForTesting
public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
return shuffleMergePartitions;
}
+
+ public boolean isFinalized() {
+ return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
+ }
}
/** Metadata tracked for an actively merged shuffle partition */
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index f4a29aa..5954733 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -1161,8 +1161,8 @@ public class RemoteBlockPushResolverSuite {
RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
- assertTrue("Metadata of determinate shuffle should be removed after
finalize shuffle"
- + " merge", appShuffleInfo.getShuffles().get(0) == null);
+ assertTrue("Determinate shuffle should be marked finalized",
+ appShuffleInfo.getShuffles().get(0).isFinalized());
validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new
int[][]{{0}, {1}});
@@ -1287,6 +1287,79 @@ public class RemoteBlockPushResolverSuite {
+ " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
}
+ @Test
+ public void
testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() {
+ //shuffle 1 0 is finalized even though the server didn't receive any
blocks for it.
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+ assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+ RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+ pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+ assertTrue("shuffle 1 should be marked finalized",
+ appShuffleInfo.getShuffles().get(1).isFinalized());
+ removeApplication(TEST_APP);
+ }
+
+ // Test for SPARK-37675 and SPARK-37793
+ @Test
+ public void testEmptyMergePartitionsAreNotReported() throws IOException {
+ //shufflePush_1_0_0_100 is received by the server
+ StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+ //shuffle 1 0 is finalized
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+ assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+ removeApplication(TEST_APP);
+ }
+
+ // Test for SPARK-37675 and SPARK-37793
+ @Test
+ public void testAllBlocksAreRejectedWhenReceivedAfterFinalization() throws
IOException {
+ //shufflePush_1_0_0_100 is received by the server
+ StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+ stream1.onComplete(stream1.getID());
+ //shuffle 1 0 is finalized
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 1, 0));
+ BlockPushNonFatalFailure errorToValidate = null;
+ try {
+ //shufflePush_1_0_0_200 is received by the server after finalization of
shuffle 1 0 which
+ //should be rejected
+ StreamCallbackWithID failureCallback =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 200, 0));
+ failureCallback.onComplete(failureCallback.getID());
+ } catch (BlockPushNonFatalFailure e) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+ errorCode.returnCode);
+ errorToValidate = e;
+ assertEquals(errorCode.failureBlockId, "shufflePush_1_0_0_200");
+ }
+ assertNotNull("shufflePush_1_0_0_200 should be rejected", errorToValidate);
+ try {
+ //shufflePush_1_0_1_100 is received by the server after finalization of
shuffle 1 0 which
+ //should also be rejected
+ StreamCallbackWithID failureCallback =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 1, 100, 0));
+ failureCallback.onComplete(failureCallback.getID());
+ } catch (BlockPushNonFatalFailure e) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+ errorCode.returnCode);
+ errorToValidate = e;
+ assertEquals(errorCode.failureBlockId, "shufflePush_1_0_1_100");
+ }
+ assertNotNull("shufflePush_1_0_1_100 should be rejected", errorToValidate);
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 1,
0, 100);
+ validateChunks(TEST_APP, 1, 0, 100, blockMeta, new int[]{4}, new
int[][]{{0}});
+ removeApplication(TEST_APP);
+ }
+
private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile)
throws IOException {
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]