This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9afb407  [SPARK-37675][SPARK-37793] Prevent overwriting of push 
shuffle merged files once the shuffle is finalized
9afb407 is described below

commit 9afb407fa7aaf2f0961661b5d8cfbec549e591ee
Author: Chandni Singh <[email protected]>
AuthorDate: Fri Jan 28 21:12:28 2022 -0600

    [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files 
once the shuffle is finalized
    
    ### What changes were proposed in this pull request?
    This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
    - Empty merged partitions were reported by the shuffle server to the driver.
    - The push merged files were getting overwritten after a shuffle merge is 
finalized.
    - Throwing exception in the finalization of a shuffle for which the shuffle 
server didn't receive any blocks.
    
    ### Why are the changes needed?
    Changes are need to fix the bug.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Have added unit test.
    
    Closes #35325 from otterc/SPARK-37675.
    
    Authored-by: Chandni Singh <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 171 ++++++++++-----------
 .../shuffle/RemoteBlockPushResolverSuite.java      |  77 +++++++++-
 2 files changed, 158 insertions(+), 90 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index d626cc3..b823076 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -83,20 +83,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   public static final String MERGE_DIR_KEY = "mergeDir";
   public static final String ATTEMPT_ID_KEY = "attemptId";
   private static final int UNDEFINED_ATTEMPT_ID = -1;
-  // Shuffles of determinate stages will have shuffleMergeId set to 0
-  private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
   private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = 
createErrorHandler();
   // ByteBuffer to respond to client upon a successful merge of a pushed block
   private static final ByteBuffer SUCCESS_RESPONSE =
     new BlockPushReturnCode(ReturnCode.SUCCESS.id(), 
"").toByteBuffer().asReadOnlyBuffer();
 
-  // ConcurrentHashMap doesn't allow null for keys or values which is why this 
is required.
-  // Marker to identify finalized indeterminate shuffle partitions in the case 
of indeterminate
-  // stage retries.
-  @VisibleForTesting
-  public static final Map<Integer, AppShufflePartitionInfo> 
INDETERMINATE_SHUFFLE_FINALIZED =
-    Collections.emptyMap();
-
   /**
    * A concurrent hashmap where the key is the applicationId, and the value 
includes
    * all the merged shuffle information for this application. AppShuffleInfo 
stores
@@ -169,59 +160,45 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       String blockId) throws BlockPushNonFatalFailure {
     ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = 
appShuffleInfo.shuffles;
     AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
-      shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
-        if (appShuffleMergePartitionsInfo == null) {
-          File dataFile =
-            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
-          // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId for determinate stages but the data file would exist.
-          // In that case the block is considered late. In the case of 
indeterminate stages, most
-          // recent shuffleMergeId finalized would be pointing to 
INDETERMINATE_SHUFFLE_FINALIZED
-          if (dataFile.exists()) {
-            throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
-              ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
-              BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
-          } else {
-            logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
-              + " with shuffleMergeId {} for application {}_{}", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId);
-            return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
-          }
+      shuffles.compute(shuffleId, (id, mergePartitionsInfo) -> {
+        if (mergePartitionsInfo == null) {
+          logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a 
new shuffle " +
+              "merge metadata", appShuffleInfo.appId, 
appShuffleInfo.attemptId, shuffleId,
+              shuffleMergeId);
+          return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
         } else {
-          // Reject the request as we have already seen a higher 
shuffleMergeId than the
-          // current incoming one
-          int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
+          int latestShuffleMergeId = mergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
+            // Reject the request as we have already seen a higher 
shuffleMergeId than the one
+            // in the current request.
             throw new BlockPushNonFatalFailure(
               new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), 
blockId).toByteBuffer(),
               BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.STALE_BLOCK_PUSH));
-          } else if (latestShuffleMergeId == shuffleMergeId) {
-            return appShuffleMergePartitionsInfo;
-          } else {
+          } else if (latestShuffleMergeId < shuffleMergeId){
             // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
             // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
             // happens in the indeterminate stage retries
-            logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
-              + " with shuffleMergeId {} for application {}_{} since it is 
higher than the"
-              + " latest shuffleMergeId {} already seen", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId);
+            logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a 
new shuffle " +
+                "merge metadata since received shuffleMergeId is higher than 
latest " +
+                "shuffleMergeId {}", appShuffleInfo.appId, 
appShuffleInfo.attemptId, shuffleId,
+                shuffleMergeId, latestShuffleMergeId);
             mergedShuffleCleaner.execute(() ->
-              
closeAndDeletePartitionFiles(appShuffleMergePartitionsInfo.shuffleMergePartitions));
+                
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
             return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
+          } else {
+            // The request is for block with same shuffleMergeId as the latest 
shuffleMergeId
+            if (mergePartitionsInfo.isFinalized()) {
+              throw new BlockPushNonFatalFailure(
+                  new BlockPushReturnCode(
+                      ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
blockId).toByteBuffer(),
+                  BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
+            }
+            return mergePartitionsInfo;
           }
         }
       });
-
-    // It only gets here when the shuffle is already finalized.
-    if (null == shufflePartitionsWithMergeId ||
-        INDETERMINATE_SHUFFLE_FINALIZED == 
shufflePartitionsWithMergeId.shuffleMergePartitions) {
-      throw new BlockPushNonFatalFailure(
-        new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
blockId).toByteBuffer(),
-        BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
-    }
-
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
-      shufflePartitionsWithMergeId.shuffleMergePartitions;
+        shufflePartitionsWithMergeId.shuffleMergePartitions;
     return shuffleMergePartitions.computeIfAbsent(reduceId, key -> {
       // It only gets here when the key is not present in the map. The first 
time the merge
       // manager receives a pushed block for a given application shuffle 
partition.
@@ -235,9 +212,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, 
shuffleMergeId,
           reduceId, dataFile, indexFile, metaFile);
       } catch (IOException e) {
-        logger.error(
-          "Cannot create merged shuffle partition with data file {}, index 
file {}, and "
-            + "meta file {}", dataFile.getAbsolutePath(),
+        logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create 
merged shuffle " +
+            "partition with data file {}, index file {}, and meta file {}", 
appShuffleInfo.appId,
+            appShuffleInfo.attemptId, shuffleId, shuffleMergeId, 
dataFile.getAbsolutePath(),
             indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
         throw new RuntimeException(
           String.format("Cannot initialize merged shuffle partition for appId 
%s shuffleId %s "
@@ -350,6 +327,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
    * If cleanupLocalDirs is true, the merged shuffle files will also be 
deleted.
    * The cleanup will be executed in a separate thread.
    */
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @VisibleForTesting
   void closeAndDeletePartitionFilesIfNeeded(
       AppShuffleInfo appShuffleInfo,
@@ -512,10 +490,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
-    logger.info("Finalizing shuffle {} with shuffleMergeId {} from Application 
{}_{}.",
-      msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+    logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalize shuffle 
merge",
+        msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
       // If finalizeShuffleMerge from a former application attempt, it is 
considered late,
@@ -534,35 +513,33 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
       new AtomicReference<>(null);
-    // Metadata of the determinate stage shuffle can be safely removed as part 
of finalizing
-    // shuffle merge. Currently once the shuffle is finalized for a 
determinate stages, retry
-    // stages of the same shuffle will have shuffle push disabled.
-    if (msg.shuffleMergeId == DETERMINATE_SHUFFLE_MERGE_ID) {
-      AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo =
-        appShuffleInfo.shuffles.remove(msg.shuffleId);
-      if (appShuffleMergePartitionsInfo != null) {
-        
shuffleMergePartitionsRef.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
-      }
-    } else {
-      appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
-        if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
-          INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (null != mergePartitionsInfo) {
+        if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId ||
+          mergePartitionsInfo.isFinalized()) {
           throw new RuntimeException(String.format(
-            "Shuffle merge finalize request for shuffle %s with" + " 
shuffleMergeId %s is %s",
-            msg.shuffleId, msg.shuffleMergeId,
-            ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
-        } else if (msg.shuffleMergeId > value.shuffleMergeId) {
+              "Shuffle merge finalize request for shuffle %s with" + " 
shuffleMergeId %s is %s",
+              msg.shuffleId, msg.shuffleMergeId,
+              
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+        } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
           mergedShuffleCleaner.execute(() ->
-            closeAndDeletePartitionFiles(value.shuffleMergePartitions));
-          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+              
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
         } else {
-          shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
-          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+          // This block covers:
+          //  1. finalization of determinate stage
+          //  2. finalization of indeterminate stage if the shuffleMergeId 
related to it is the one
+          //  for which the message is received.
+          
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
-      });
-    }
+      }
+      // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
+      // sent to the driver will be empty. This cam happen when the service 
didn't receive any
+      // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
+      // shuffle.
+      return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+    });
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = 
shuffleMergePartitionsRef.get();
     MergeStatuses mergeStatuses;
     if (null == shuffleMergePartitions || shuffleMergePartitions.isEmpty()) {
@@ -576,14 +553,25 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
         synchronized (partition) {
           try {
+            logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing 
shuffle " +
+                "partition {} ", msg.appId, msg.appAttemptId, msg.shuffleId,
+                msg.shuffleMergeId, partition.reduceId);
             // This can throw IOException which will marks this shuffle 
partition as not merged.
             partition.finalizePartition();
-            bitmaps.add(partition.mapTracker);
-            reduceIds.add(partition.reduceId);
-            sizes.add(partition.getLastChunkOffset());
+            if (partition.mapTracker.getCardinality() > 0) {
+              bitmaps.add(partition.mapTracker);
+              reduceIds.add(partition.reduceId);
+              sizes.add(partition.getLastChunkOffset());
+              logger.debug("{} attempt {} shuffle {} shuffleMerge {}: 
finalization results " +
+                  "added for partition {} data size {} index size {} meta size 
{}",
+                  msg.appId, msg.appAttemptId, msg.shuffleId,
+                  msg.shuffleMergeId, partition.reduceId, 
partition.getLastChunkOffset(),
+                  partition.indexFile.getPos(), partition.metaFile.getPos());
+            }
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
-              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("{} attempt {} shuffle {} shuffleMerge {}: exception 
while " +
+                "finalizing shuffle partition {}", msg.appId, 
msg.appAttemptId, msg.shuffleId,
+                msg.shuffleMergeId, partition.reduceId);
           } finally {
             partition.closeAllFilesAndDeleteIfNeeded(false);
           }
@@ -593,8 +581,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    logger.info("Finalized shuffle {} with shuffleMergeId {} from Application 
{}_{}.",
-      msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
+    logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of 
shuffle merge completed",
+        msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
     return mergeStatuses;
   }
 
@@ -808,7 +796,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
         int reduceId) {
       return null == appShuffleMergePartitionsInfo ||
-        INDETERMINATE_SHUFFLE_FINALIZED == 
appShuffleMergePartitionsInfo.shuffleMergePartitions ||
+        appShuffleMergePartitionsInfo.isFinalized() ||
         
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
@@ -1008,20 +996,27 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
    * required for the shuffles of indeterminate stages.
    */
   public static class AppShuffleMergePartitionsInfo {
+    // ConcurrentHashMap doesn't allow null for keys or values which is why 
this is required.
+    // Marker to identify finalized shuffle partitions.
+    private static final Map<Integer, AppShufflePartitionInfo> 
SHUFFLE_FINALIZED_MARKER =
+        Collections.emptyMap();
     private final int shuffleMergeId;
     private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
 
-    public AppShuffleMergePartitionsInfo(
-        int shuffleMergeId, boolean shuffleFinalized) {
+    public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean 
shuffleFinalized) {
       this.shuffleMergeId = shuffleMergeId;
-      this.shuffleMergePartitions = shuffleFinalized ?
-        INDETERMINATE_SHUFFLE_FINALIZED : new ConcurrentHashMap<>();
+      this.shuffleMergePartitions = shuffleFinalized ? 
SHUFFLE_FINALIZED_MARKER :
+          new ConcurrentHashMap<>();
     }
 
     @VisibleForTesting
     public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
       return shuffleMergePartitions;
     }
+
+    public boolean isFinalized() {
+      return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
+    }
   }
 
   /** Metadata tracked for an actively merged shuffle partition */
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index f4a29aa..5954733 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -1161,8 +1161,8 @@ public class RemoteBlockPushResolverSuite {
 
     RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
       pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
-    assertTrue("Metadata of determinate shuffle should be removed after 
finalize shuffle"
-      + " merge", appShuffleInfo.getShuffles().get(0) == null);
+    assertTrue("Determinate shuffle should be marked finalized",
+        appShuffleInfo.getShuffles().get(0).isFinalized());
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new 
int[][]{{0}, {1}});
@@ -1287,6 +1287,79 @@ public class RemoteBlockPushResolverSuite {
       + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
   }
 
+  @Test
+  public void 
testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() {
+    //shuffle 1 0 is finalized even though the server didn't receive any 
blocks for it.
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+        new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+    assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+        pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+    assertTrue("shuffle 1 should be marked finalized",
+        appShuffleInfo.getShuffles().get(1).isFinalized());
+    removeApplication(TEST_APP);
+  }
+
+  // Test for SPARK-37675 and SPARK-37793
+  @Test
+  public void testEmptyMergePartitionsAreNotReported() throws IOException {
+    //shufflePush_1_0_0_100 is received by the server
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+    //shuffle 1 0 is finalized
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+        new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 1, 0));
+    assertEquals("no partitions were merged", 0, statuses.reduceIds.length);
+    removeApplication(TEST_APP);
+  }
+
+  // Test for SPARK-37675 and SPARK-37793
+  @Test
+  public void testAllBlocksAreRejectedWhenReceivedAfterFinalization() throws 
IOException {
+    //shufflePush_1_0_0_100 is received by the server
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 100, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
+    stream1.onComplete(stream1.getID());
+    //shuffle 1 0 is finalized
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 1, 0));
+    BlockPushNonFatalFailure errorToValidate = null;
+    try {
+      //shufflePush_1_0_0_200 is received by the server after finalization of 
shuffle 1 0 which
+      //should be rejected
+      StreamCallbackWithID failureCallback = 
pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 0, 200, 0));
+      failureCallback.onComplete(failureCallback.getID());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+          (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+          errorCode.returnCode);
+      errorToValidate = e;
+      assertEquals(errorCode.failureBlockId, "shufflePush_1_0_0_200");
+    }
+    assertNotNull("shufflePush_1_0_0_200 should be rejected", errorToValidate);
+    try {
+      //shufflePush_1_0_1_100 is received by the server after finalization of 
shuffle 1 0 which
+      //should also be rejected
+      StreamCallbackWithID failureCallback = 
pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 1, 0, 1, 100, 0));
+      failureCallback.onComplete(failureCallback.getID());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+          (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+          errorCode.returnCode);
+      errorToValidate = e;
+      assertEquals(errorCode.failureBlockId, "shufflePush_1_0_1_100");
+    }
+    assertNotNull("shufflePush_1_0_1_100 should be rejected", errorToValidate);
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 1, 
0, 100);
+    validateChunks(TEST_APP, 1, 0, 100, blockMeta, new int[]{4}, new 
int[][]{{0}});
+    removeApplication(TEST_APP);
+  }
+
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) 
throws IOException {
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to