Repository: tez
Updated Branches:
  refs/heads/branch-0.6 60dc76c8f -> 71fa843c0


TEZ-2237. Valid events should be sent out when an Output is not started. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71fa843c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71fa843c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71fa843c

Branch: refs/heads/branch-0.6
Commit: 71fa843c0c70abd45c35d95b056c90f4ade258cf
Parents: 60dc76c
Author: Siddharth Seth <[email protected]>
Authored: Mon May 4 16:31:12 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Mon May 4 16:31:12 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/common/shuffle/ShuffleUtils.java    | 64 ++++++++++++++++
 .../output/OrderedPartitionedKVOutput.java      | 15 +++-
 .../output/UnorderedPartitionedKVOutput.java    | 11 ++-
 .../library/output/OutputTestHelpers.java       | 49 ++++++++++++
 .../output/TestOrderedPartitionedKVOutput2.java | 67 +++++++++++++++++
 .../library/output/TestUnorderedKVOutput2.java  | 79 ++++++++++++++++++++
 .../TestUnorderedPartitionedKVOutput2.java      | 62 +++++++++++++++
 8 files changed, 344 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 24c5f51..dc9a42e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2237. Valid events should be sent out when an Output is not started.
   TEZ-2399. Tez UI: add proper dependencies for computed properties
   TEZ-1988. Tez UI: does not work when using file:// in a browser
   TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in 
UnorderedPartitionedKVWriter.

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index fb929e5..f686f1f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -24,10 +24,12 @@ import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.List;
 
 import javax.crypto.SecretKey;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,13 +38,21 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import 
org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
 import 
org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
 public class ShuffleUtils {
@@ -220,6 +230,60 @@ public class ShuffleUtils {
     return builder.build();
   }
 
+  /**
+   * Generate events for outputs which have not been started.
+   * @param eventList
+   * @param numPhysicalOutputs
+   * @param context
+   * @param generateVmEvent whether to generate a vm event or not
+   * @param isCompositeEvent whether to generate a CompositeDataMovementEvent 
or a DataMovementEvent
+   * @throws IOException
+   */
+  public static void generateEventsForNonStartedOutput(List<Event> eventList,
+                                                       int numPhysicalOutputs,
+                                                       OutputContext context,
+                                                       boolean generateVmEvent,
+                                                       boolean 
isCompositeEvent) throws
+      IOException {
+    DataMovementEventPayloadProto.Builder payloadBuilder = 
DataMovementEventPayloadProto
+        .newBuilder();
+
+
+    // Construct the VertexManager event if required.
+    if (generateVmEvent) {
+      ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+          ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+      vmBuilder.setOutputSize(0);
+      VertexManagerEvent vmEvent = VertexManagerEvent.create(
+          context.getDestinationVertexName(),
+          vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+      eventList.add(vmEvent);
+    }
+
+    // Construct the DataMovementEvent
+    // Always set empty partition information since no files were generated.
+    LOG.info("Setting all " + numPhysicalOutputs + " partitions as empty for 
non-started output: ");
+    BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs);
+    emptyPartitionDetails.set(0, numPhysicalOutputs, true);
+    ByteString emptyPartitionsBytesString =
+        TezCommonUtils.compressByteArrayToByteString(
+            TezUtilsInternal.toByteArray(emptyPartitionDetails));
+    payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+    payloadBuilder.setRunDuration(0);
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+    ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer();
+
+
+    if (isCompositeEvent) {
+      CompositeDataMovementEvent cdme =
+          CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload);
+      eventList.add(cdme);
+    } else {
+      DataMovementEvent dme = DataMovementEvent.create(0, dmePayload);
+      eventList.add(dme);
+    }
+  }
+
   public static String stringify(DataMovementEventPayloadProto dmProto) {
     StringBuilder sb = new StringBuilder();
     sb.append("[");

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index b3290a5..bcdd736 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -147,9 +148,12 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       this.endTime = System.nanoTime();
       return generateEventsOnClose();
     } else {
-      LOG.warn("Attempting to close output " + 
getContext().getDestinationVertexName()
-          + " before it was started");
-      return Collections.emptyList();
+      LOG.warn(
+          "Attempting to close output " + 
getContext().getDestinationVertexName() + " of type " +
+              this.getClass().getSimpleName() + " before it was started. 
Generating empty events");
+
+      List<Event> returnEvents = generateEmptyEvents();
+      return returnEvents;
     }
   }
   
@@ -214,6 +218,11 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     return events;
   }
 
+  private List<Event> generateEmptyEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    ShuffleUtils.generateEventsForNonStartedOutput(eventList, 
getNumPhysicalOutputs(), getContext(), true, true);
+    return eventList;
+  }
 
   private static final Set<String> confKeys = new HashSet<String>();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 1e39535..755dfb8 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.output;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,6 +42,7 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import 
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
 
 /**
@@ -100,7 +102,14 @@ public class UnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     if (isStarted.get()) {
       return kvWriter.close();
     } else {
-      return Collections.emptyList();
+      LOG.warn(
+          "Attempting to close output " + 
getContext().getDestinationVertexName() + " of type " +
+              this.getClass().getSimpleName() + " before it was started. 
Generating empty events");
+      List<Event> returnEvents = new LinkedList<Event>();
+      ShuffleUtils
+          .generateEventsForNonStartedOutput(returnEvents, 
getNumPhysicalOutputs(), getContext(),
+              false, true);
+      return returnEvents;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
new file mode 100644
index 0000000..0f6e2ca
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.OutputContext;
+
+public class OutputTestHelpers {
+  static OutputContext createOutputContext(@Nullable Path workingDir) throws 
IOException {
+    OutputContext outputContext = mock(OutputContext.class);
+    Configuration conf = new TezConfiguration();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    String workDirString = workingDir == null ? "workDir" : 
workingDir.toString();
+    String[] workingDirs = new String[]{workDirString};
+    TezCounters counters = new TezCounters();
+
+    
doReturn("destinationVertex").when(outputContext).getDestinationVertexName();
+    doReturn(payLoad).when(outputContext).getUserPayload();
+    doReturn(workingDirs).when(outputContext).getWorkDirs();
+    doReturn(200 * 1024 * 
1024l).when(outputContext).getTotalMemoryAvailableToTask();
+    doReturn(counters).when(outputContext).getCounters();
+    
doReturn(UUID.randomUUID().toString()).when(outputContext).getUniqueIdentifier();
+    return outputContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..2dd895f
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestOrderedPartitionedKVOutput2 {
+
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws IOException {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(null);
+    int numPartitions = 10;
+    OrderedPartitionedKVOutput output = new 
OrderedPartitionedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(2, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof VertexManagerEvent);
+    Event event2 = events.get(1);
+    assertTrue(event2 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+    ByteBuffer bb = cdme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0 ; i < numPartitions ; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
new file mode 100644
index 0000000..fc0d996
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedKVOutput2 {
+
+  private FileSystem fs;
+  Path workingDir = new Path(".", this.getClass().getName());
+
+  @Before
+  public void setUp() throws Exception {
+    fs = FileSystem.getLocal(new Configuration());
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    fs.delete(workingDir, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = 
OutputTestHelpers.createOutputContext(workingDir);
+    int numPartitions = 1;
+    UnorderedKVOutput output = new UnorderedKVOutput(outputContext, 
numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(1, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof DataMovementEvent);
+    DataMovementEvent dme = (DataMovementEvent) event1;
+    ByteBuffer bb = dme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0 ; i < numPartitions ; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..726ff08
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedPartitionedKVOutput2 {
+
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(null);
+    int numPartitions = 1;
+    UnorderedPartitionedKVOutput output =
+        new UnorderedPartitionedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(1, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
+    ByteBuffer bb = dme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0; i < numPartitions; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+}

Reply via email to