Repository: tez
Updated Branches:
  refs/heads/master 24475acc7 -> b3e20c7c6


TEZ-3769. Unordered: Fix wrong stats being sent out in the last event, when 
final merge is disabled (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b3e20c7c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b3e20c7c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b3e20c7c

Branch: refs/heads/master
Commit: b3e20c7c69125645797674d20c01802e9a409558
Parents: 24475ac
Author: Rajesh Balamohan <[email protected]>
Authored: Wed Jun 28 13:37:09 2017 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Wed Jun 28 13:37:09 2017 +0530

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 134 +++++----
 .../TestUnorderedPartitionedKVWriter.java       | 273 ++++++++++++++++++-
 2 files changed, 352 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b3e20c7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 70c577c..6ea0385 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -172,7 +172,8 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
   //for single partition cases (e.g UnorderedKVOutput)
   private final IFile.Writer writer;
-  private final boolean skipBuffers;
+  @VisibleForTesting
+  final boolean skipBuffers;
 
   private final ReentrantLock spillLock = new ReentrantLock();
   private final Condition spillInProgress = spillLock.newCondition();
@@ -285,6 +286,9 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         + "numBuffers=" + numBuffers
         + ", sizePerBuffer=" + sizePerBuffer
         + ", skipBuffers=" + skipBuffers
+        + ", numPartitions=" + numPartitions
+        + ", availableMemory=" + availableMemory
+        + ", maxSingleBufferSizeBytes=" + maxSingleBufferSizeBytes
         + ", pipelinedShuffle=" + pipelinedShuffle
         + ", isFinalMergeEnabled=" + isFinalMergeEnabled
         + ", numPartitions=" + numPartitions
@@ -558,17 +562,14 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
     public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec 
codec,
         TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) {
-      this.filledBuffers = filledBuffers;
-      this.codec = codec;
-      this.numRecordsCounter = numRecordsCounter;
-      this.spillIndex = spillPathDetails.spillIndex;
+      this(filledBuffers, codec, numRecordsCounter, 
spillPathDetails.spillIndex);
       Preconditions.checkArgument(spillPathDetails.outputFilePath != null, 
"Spill output file "
           + "path can not be null");
       this.spillPathDetails = spillPathDetails;
     }
 
     public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec 
codec,
-        TezCounter numRecordsCounter, int spillNumber) throws IOException {
+        TezCounter numRecordsCounter, int spillNumber) {
       this.filledBuffers = filledBuffers;
       this.codec = codec;
       this.numRecordsCounter = numRecordsCounter;
@@ -720,47 +721,66 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
             emptyPartitions.set(0);
           }
           if (reportPartitionStats()) {
-            sizePerPartition[0] = rawLen;
+            if (outputRecordsCounter.getValue() > 0) {
+              sizePerPartition[0] = rawLen;
+            }
           }
           cleanupCurrentBuffer();
 
-          outputBytesWithOverheadCounter.increment(rawLen);
-          fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
+          if (outputRecordsCounter.getValue() > 0) {
+            outputBytesWithOverheadCounter.increment(rawLen);
+            fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
+          }
           eventList.add(generateVMEvent());
           eventList.add(generateDMEvent(false, -1, false, outputContext
               .getUniqueIdentifier(), emptyPartitions));
           return eventList;
         }
 
-        //Regular code path.
-        boolean updatedCounters = false;
-        if (numSpills.get() > 0 && isFinalMergeEnabled) {
-          mergeAll();
-        } else {
-          if (finalSpill() && !isFinalMergeEnabled) {
-            //final spill generated some data. Add it to final events
-            updateTezCountersAndNotify();
-            updatedCounters = true;
-            finalEvents.add(generateVMEvent());
-            finalEvents.add(generateDMEvent());
+        /*
+          1. Final merge enabled
+             - When lots of spills are there, mergeAll, generate events and 
return
+             - If there are no existing spills, check for final spill and 
generate events
+          2. Final merge disabled
+             - If finalSpill generated data, generate events and return
+             - If finalSpill did not generate data, it would automatically 
populate events
+         */
+        if (isFinalMergeEnabled) {
+          if (numSpills.get() > 0) {
+            mergeAll();
+          } else {
+            finalSpill();
           }
-        }
-        if (!updatedCounters) {
           updateTezCountersAndNotify();
-        }
-        cleanupCurrentBuffer();
-        if (isFinalMergeEnabled) {
           eventList.add(generateVMEvent());
           eventList.add(generateDMEvent());
         } else {
-          //all events to be sent out are added in finalEvents.
+          // if no data is generated, finalSpill would create VMEvent & add to 
finalEvents
+          SpillResult result = finalSpill();
+          if (result != null) {
+            updateTezCountersAndNotify();
+            // Generate vm event
+            finalEvents.add(generateVMEvent());
+
+            // compute empty partitions based on spill result and generate DME
+            int spillNum = numSpills.get() - 1;
+            SpillCallback callback = new SpillCallback(spillNum);
+            callback.computePartitionStats(result);
+            BitSet emptyPartitions = 
getEmptyPartitions(callback.getRecordsPerPartition());
+            String pathComponent = 
generatePathComponent(outputContext.getUniqueIdentifier(), spillNum);
+            Event finalEvent = generateDMEvent(true, spillNum,
+                true, pathComponent, emptyPartitions);
+            finalEvents.add(finalEvent);
+          }
+          //all events to be sent out are in finalEvents.
           eventList.addAll(finalEvents);
         }
+        cleanupCurrentBuffer();
         return eventList;
       }
 
       //For pipelined case, send out an event in case finalspill generated a 
spill file.
-      if (finalSpill()) {
+      if (finalSpill() != null) {
         // VertexManagerEvent is only sent at the end and thus 
sizePerPartition is used
         // for the sum of all spills.
         mayBeSendEventsForSpill(currentBuffer.recordsPerPartition,
@@ -848,41 +868,44 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     availableBuffers.clear();
   }
 
-  private boolean finalSpill() throws IOException {
+  private SpillResult finalSpill() throws IOException {
     if (currentBuffer.nextPosition == 0) {
       if (pipelinedShuffle || !isFinalMergeEnabled) {
         List<Event> eventList = Lists.newLinkedList();
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
             reportPartitionStats() ? new long[numPartitions] : null,
-                reportDetailedPartitionStats(), deflater.get()));
-        //Send final event with all empty partitions and null path component.
-        BitSet emptyPartitions = new BitSet(numPartitions);
-        emptyPartitions.flip(0, numPartitions);
-        eventList.add(generateDMEvent(true, numSpills.get(), true,
-            null, emptyPartitions));
+            reportDetailedPartitionStats(), deflater.get()));
+        if (localOutputRecordsCounter == 0 && 
outputLargeRecordsCounter.getValue() == 0) {
+          // Should send this event (all empty partitions) only when no 
records are written out.
+          BitSet emptyPartitions = new BitSet(numPartitions);
+          emptyPartitions.flip(0, numPartitions);
+          eventList.add(generateDMEvent(true, numSpills.get(), true,
+              null, emptyPartitions));
+        }
         if (pipelinedShuffle) {
           outputContext.sendEvents(eventList);
         } else if (!isFinalMergeEnabled) {
-          finalEvents.addAll(eventList);
+          finalEvents.addAll(0, eventList);
         }
       }
-      return false;
+      return null;
     } else {
       updateGlobalStats(currentBuffer);
       filledBuffers.add(currentBuffer);
 
       //setup output file and index file
       SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1);
-      SpillCallable spillCallable = new SpillCallable(filledBuffers, codec, 
null, spillPathDetails);
+      SpillCallable spillCallable = new SpillCallable(filledBuffers,
+          codec, null, spillPathDetails);
       try {
         SpillResult spillResult = spillCallable.call();
 
         fileOutputBytesCounter.increment(spillResult.spillSize);
         fileOutputBytesCounter.increment(indexFileSizeEstimate);
+        return spillResult;
       } catch (Exception ex) {
         throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
       }
-      return true;
     }
 
   }
@@ -934,8 +957,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       indexFilePath  = 
outputFileHandler.getSpillIndexFileForWrite(spillNumber, indexFileSizeEstimate);
     }
 
-    SpillPathDetails spillDetails = new SpillPathDetails(outputFilePath, 
indexFilePath, spillNumber);
-    return spillDetails;
+    return new SpillPathDetails(outputFilePath, indexFilePath, spillNumber);
   }
 
   private void mergeAll() throws IOException {
@@ -1184,12 +1206,16 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     }
   }
 
+  private String generatePathComponent(String uniqueId, int spillNumber) {
+    return (uniqueId + "_" + spillNumber);
+  }
+
   private List<Event> generateEventForSpill(BitSet emptyPartitions, long[] 
sizePerPartition,
       int spillNumber,
       boolean isFinalUpdate) throws IOException {
     List<Event> eventList = Lists.newLinkedList();
     //Send out an event for consuming.
-    String pathComponent = (outputContext.getUniqueIdentifier() + "_" + 
spillNumber);
+    String pathComponent = 
generatePathComponent(outputContext.getUniqueIdentifier(), spillNumber);
     if (isFinalUpdate) {
       eventList.add(ShuffleUtils.generateVMEvent(outputContext,
           sizePerPartition, reportDetailedPartitionStats(), deflater.get()));
@@ -1237,19 +1263,14 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
   private class SpillCallback implements FutureCallback<SpillResult> {
 
     private final int spillNumber;
+    private int recordsPerPartition[];
+    private long sizePerPartition[];
 
     SpillCallback(int spillNumber) {
       this.spillNumber = spillNumber;
     }
 
-    @Override
-    public void onSuccess(SpillResult result) {
-      synchronized (UnorderedPartitionedKVWriter.this) {
-        spilledSize += result.spillSize;
-      }
-
-      int recordsPerPartition[] = null;
-      long sizePerPartition[] = null;
+    void computePartitionStats(SpillResult result) {
       if (result.filledBuffers.size() == 1) {
         recordsPerPartition = result.filledBuffers.get(0).recordsPerPartition;
         sizePerPartition = result.filledBuffers.get(0).sizePerPartition;
@@ -1263,6 +1284,19 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
           }
         }
       }
+    }
+
+    int[] getRecordsPerPartition() {
+      return recordsPerPartition;
+    }
+
+    @Override
+    public void onSuccess(SpillResult result) {
+      synchronized (UnorderedPartitionedKVWriter.this) {
+        spilledSize += result.spillSize;
+      }
+
+      computePartitionStats(result);
 
       mayBeSendEventsForSpill(recordsPerPartition, sizePerPartition, 
spillNumber, false);
 
@@ -1276,7 +1310,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure 
while attempting to reset buffer after spill");
       }
 
-      if (!pipelinedShuffle) {
+      if (!pipelinedShuffle && isFinalMergeEnabled) {
         synchronized(additionalSpillBytesWritternCounter) {
           additionalSpillBytesWritternCounter.increment(result.spillSize);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/b3e20c7c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 71bd240..f1cea7e 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -19,13 +19,13 @@ package org.apache.tez.runtime.library.common.writers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -47,12 +47,13 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
@@ -89,7 +90,6 @@ import 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import 
org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.junit.After;
 import org.junit.Before;
@@ -275,6 +275,12 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @Test(timeout = 10000)
+  public void testNoRecords_SinglePartition() throws IOException, 
InterruptedException {
+    // skipBuffers
+    baseTest(0, 1, null, shouldCompress, -1, 0);
+  }
+
+  @Test(timeout = 10000)
   public void testSkippedPartitions() throws IOException, InterruptedException 
{
     baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress, -1, 0);
   }
@@ -614,6 +620,12 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @Test(timeout = 10000)
+  public void testNoRecords_SinglePartition_WithPipelinedShuffle() throws 
IOException, InterruptedException {
+    // skipBuffers
+    baseTestWithPipelinedTransfer(0, 1, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
   public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, 
InterruptedException {
     baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(2, 5), 
shouldCompress);
   }
@@ -681,6 +693,11 @@ public class TestUnorderedPartitionedKVWriter {
 
     ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
     List<Event> lastEvents = kvWriter.close();
+
+    if (numPartitions == 1) {
+      assertEquals(false, kvWriter.skipBuffers);
+    }
+
     //no events are sent to kvWriter upon close with pipelining
     assertTrue(lastEvents.size() == 0);
     verify(outputContext, 
atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture());
@@ -753,7 +770,6 @@ public class TestUnorderedPartitionedKVWriter {
     //No additional spills when final merge is disabled.
     assertEquals(numAdditionalSpillsCounter.getValue(), 0);
 
-    BitSet emptyPartitionBits = null;
     assertTrue(lastEvents.size() > 0);
     //Get the last event
     int index = lastEvents.size() - 1;
@@ -767,10 +783,31 @@ public class TestUnorderedPartitionedKVWriter {
             ByteString.copyFrom(cdme.getUserPayload()));
     //Ensure that this is the last event
     assertTrue(eventProto.getLastEvent());
+    verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, 
skippedPartitions);
+
+    verify(outputContext, atLeast(1)).notifyProgress();
+
+    // Verify if all spill files are available.
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
+
+    if (numRecordsWritten > 0) {
+      int numSpills = kvWriter.numSpills.get();
+      for (int i = 0; i < numSpills; i++) {
+        assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10)));
+        assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 
10)));
+      }
+    } else {
+      return;
+    }
+  }
+
+  private void verifyEmptyPartitions(DataMovementEventPayloadProto eventProto,
+      int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions)
+      throws IOException {
     if (eventProto.hasEmptyPartitions()) {
       byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(
           eventProto.getEmptyPartitions());
-      emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
+      BitSet emptyPartitionBits = 
TezUtilsInternal.fromByteArray(emptyPartitions);
       if (numRecordsWritten == 0) {
         assertEquals(numPartitions, emptyPartitionBits.cardinality());
       } else {
@@ -795,6 +832,228 @@ public class TestUnorderedPartitionedKVWriter {
       assertEquals(SHUFFLE_PORT, eventProto.getPort());
       assertTrue(eventProto.hasPathComponent());
     }
+  }
+
+  @Test(timeout = 10000)
+  public void testNoSpill_WithFinalMergeDisabled() throws IOException, 
InterruptedException {
+    baseTestWithFinalMergeDisabled(10, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSingleSpill_WithFinalMergeDisabled() throws IOException, 
InterruptedException {
+    baseTestWithFinalMergeDisabled(50, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSinglePartition_WithFinalMergeDisabled() throws IOException, 
InterruptedException {
+    baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testMultipleSpills_WithFinalMergeDisabled() throws IOException, 
InterruptedException {
+    baseTestWithFinalMergeDisabled(200, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoRecords_WithFinalMergeDisabled() throws IOException, 
InterruptedException {
+    baseTestWithFinalMergeDisabled(0, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoRecords_SinglePartition_WithFinalMergeDisabled() throws 
IOException, InterruptedException {
+    baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSkippedPartitions_WithFinalMergeDisabled() throws 
IOException, InterruptedException {
+    baseTestWithFinalMergeDisabled(200, 10, Sets.newHashSet(2, 5), 
shouldCompress);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void baseTestWithFinalMergeDisabled(int numRecords, int 
numPartitions,
+      Set<Integer> skippedPartitions, boolean shouldCompress) throws 
IOException, InterruptedException {
+
+    PartitionerForTest partitioner = new PartitionerForTest();
+    ApplicationId appId = ApplicationId.newInstance(10000000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    int dagId = 1;
+    String auxiliaryService = 
defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, auxiliaryService);
+
+    Configuration conf = createConfiguration(outputContext, IntWritable.class, 
LongWritable.class,
+        shouldCompress, -1);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false);
+
+    CompressionCodec codec = null;
+    if (shouldCompress) {
+      codec = new DefaultCodec();
+      ((Configurable) codec).setConf(conf);
+    }
+
+    int numOutputs = numPartitions;
+    long availableMemory = 2048;
+    int numRecordsWritten = 0;
+
+    UnorderedPartitionedKVWriter kvWriter = new 
UnorderedPartitionedKVWriterForTest(outputContext,
+        conf, numOutputs, availableMemory);
+
+    int sizePerBuffer = kvWriter.sizePerBuffer;
+    int sizePerRecord = 4 + 8; // IntW + LongW
+    int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + 
META_OVERHEAD
+
+    BitSet partitionsWithData = new BitSet(numPartitions);
+    IntWritable intWritable = new IntWritable();
+    LongWritable longWritable = new LongWritable();
+    for (int i = 0; i < numRecords; i++) {
+      intWritable.set(i);
+      longWritable.set(i);
+      int partition = partitioner.getPartition(intWritable, longWritable, 
numOutputs);
+      if (skippedPartitions != null && skippedPartitions.contains(partition)) {
+        continue;
+      }
+      partitionsWithData.set(partition);
+      kvWriter.write(intWritable, longWritable);
+      numRecordsWritten++;
+    }
+
+    int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
+    int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
+
+    ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
+    List<Event> lastEvents = kvWriter.close();
+
+    if (numPartitions == 1) {
+      assertEquals(true, kvWriter.skipBuffers);
+    }
+
+    // max events sent are spills + one VM event. If there are no spills, 
atleast empty
+    // partitions would be sent out finally.
+    int spills = Math.max(1, kvWriter.numSpills.get());
+    assertEquals((spills + 1), lastEvents.size()); //spills + VMEvent
+    verify(outputContext, atMost(0)).sendEvents(eventCaptor.capture());
+
+    for (int i=0; i<lastEvents.size(); i++) {
+      Event event =lastEvents.get(i);
+      if (event instanceof VertexManagerEvent) {
+        //when there are no records, empty IFile with 6 bytes would be created 
which would add up
+        // to stats.
+        if (numRecordsWritten > 0) {
+          verifyPartitionStats(((VertexManagerEvent) event), 
partitionsWithData);
+        }
+      }
+    }
+
+    verify(outputContext, never()).reportFailure(any(TaskFailureType.class),
+        any(Throwable.class), any(String.class));
+
+    assertNull(kvWriter.currentBuffer);
+    assertEquals(0, kvWriter.availableBuffers.size());
+
+    // Verify the counters
+    TezCounter outputRecordBytesCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter outputRecordsCounter =
+        counters.findCounter(TaskCounter.OUTPUT_RECORDS);
+    TezCounter outputBytesWithOverheadCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    TezCounter fileOutputBytesCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    TezCounter spilledRecordsCounter =
+        counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter additionalSpillBytesWritternCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    TezCounter additionalSpillBytesReadCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    TezCounter numAdditionalSpillsCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    assertEquals(numRecordsWritten * sizePerRecord,
+        outputRecordBytesCounter.getValue());
+    assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
+    if (outputRecordsCounter.getValue() > 0) {
+      assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
+          outputBytesWithOverheadCounter.getValue());
+    } else {
+      assertEquals(0, outputBytesWithOverheadCounter.getValue());
+    }
+    long fileOutputBytes = fileOutputBytesCounter.getValue();
+    if (numRecordsWritten > 0) {
+      assertTrue(fileOutputBytes > 0);
+      if (!shouldCompress) {
+        assertTrue("fileOutputBytes=" + fileOutputBytes + ", 
outputRecordBytes="
+                +outputRecordBytesCounter.getValue(),
+            fileOutputBytes > outputRecordBytesCounter.getValue());
+      }
+    } else {
+      assertEquals(0, fileOutputBytes);
+    }
+    // due to multiple threads, buffers could be merged in chunks in 
scheduleSpill.
+    assertTrue(recordsPerBuffer * numExpectedSpills >= 
spilledRecordsCounter.getValue());
+    long additionalSpillBytesWritten = 
additionalSpillBytesWritternCounter.getValue();
+    long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
+
+    //No additional spill bytes written when final merge is disabled.
+    assertEquals(additionalSpillBytesWritten, 0);
+
+    //No additional spills when final merge is disabled.
+    assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
+
+    //No additional spills when final merge is disabled.
+    assertEquals(numAdditionalSpillsCounter.getValue(), 0);
+
+    assertTrue(lastEvents.size() > 0);
+    //Get the last event
+    int index = lastEvents.size() - 1;
+    assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme =
+        (CompositeDataMovementEvent)lastEvents.get(index);
+    assertEquals(0, cdme.getSourceIndexStart());
+    assertEquals(numOutputs, cdme.getCount());
+    DataMovementEventPayloadProto eventProto =
+        DataMovementEventPayloadProto.parseFrom(
+            ByteString.copyFrom(cdme.getUserPayload()));
+
+    verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, 
skippedPartitions);
+
+    if (outputRecordsCounter.getValue() > 0) {
+      //Ensure that this is the last event
+      assertTrue(eventProto.getLastEvent());
+    }
+
+    // Verify if all path components have spillIds when final merge is disabled
+    Pattern mergePathComponentPattern = Pattern.compile("(.*)(_\\d+)");
+    for(Event event : lastEvents) {
+      if (!(event instanceof CompositeDataMovementEvent)) {
+        continue;
+      }
+      cdme = (CompositeDataMovementEvent)event;
+      eventProto = 
DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
+
+      assertEquals(false, eventProto.getPipelined());
+      if (eventProto.hasPathComponent()) {
+        //for final merge disabled cases, it should have _spillId
+        Matcher matcher = 
mergePathComponentPattern.matcher(eventProto.getPathComponent());
+        assertTrue("spill id should be present in path component " + 
eventProto.getPathComponent(), matcher.matches());
+        assertEquals(2, matcher.groupCount());
+        assertEquals(uniqueId, matcher.group(1));
+        assertTrue("spill id should be present in path component", 
matcher.group(2) != null);
+      } else {
+        assertEquals(0, eventProto.getSpillId());
+        if (outputRecordsCounter.getValue() > 0) {
+          assertEquals(true, eventProto.getLastEvent());
+        } else {
+          byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(eventProto
+              .getEmptyPartitions());
+          BitSet emptyPartitionBits = 
TezUtilsInternal.fromByteArray(emptyPartitions);
+          assertEquals(numPartitions, emptyPartitionBits.cardinality());
+        }
+      }
+    }
+
 
     verify(outputContext, atLeast(1)).notifyProgress();
 
@@ -876,6 +1135,10 @@ public class TestUnorderedPartitionedKVWriter {
     }
     List<Event> events = kvWriter.close();
 
+    if (numPartitions == 1) {
+      assertEquals(true, kvWriter.skipBuffers);
+    }
+
     int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
     int numExpectedSpills = numRecordsWritten / recordsPerBuffer / 
kvWriter.spillLimit;
 

Reply via email to