Repository: tez Updated Branches: refs/heads/master ba6d7e0e9 -> e762a35fd
TEZ-2237. Valid events should be sent out when an Output is not started. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e762a35f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e762a35f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e762a35f Branch: refs/heads/master Commit: e762a35fd81228f85c455b612f7cc8ff6a305e41 Parents: ba6d7e0 Author: Siddharth Seth <[email protected]> Authored: Mon May 4 16:30:06 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon May 4 16:30:06 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/shuffle/ShuffleUtils.java | 55 ++++++++++++++++ .../output/OrderedPartitionedKVOutput.java | 16 +++-- .../library/output/UnorderedKVOutput.java | 13 +++- .../output/UnorderedPartitionedKVOutput.java | 11 +++- .../library/output/OutputTestHelpers.java | 47 ++++++++++++++ .../output/TestOrderedPartitionedKVOutput2.java | 67 ++++++++++++++++++++ .../library/output/TestUnorderedKVOutput2.java | 60 ++++++++++++++++++ .../TestUnorderedPartitionedKVOutput2.java | 62 ++++++++++++++++++ 9 files changed, 325 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0027e98..5b18258 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -161,6 +161,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2237. Valid events should be sent out when an Output is not started. TEZ-1988. Tez UI: does not work when using file:// in a browser TEZ-2390. tez-tools swimlane tool fails to parse large jobs >8K containers TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 9a8b6b5..46489ed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -33,6 +33,7 @@ import javax.crypto.SecretKey; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.events.DataMovementEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -333,6 +334,60 @@ public class ShuffleUtils { } /** + * Generate events for outputs which have not been started. + * @param eventList + * @param numPhysicalOutputs + * @param context + * @param generateVmEvent whether to generate a vm event or not + * @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent + * @throws IOException + */ + public static void generateEventsForNonStartedOutput(List<Event> eventList, + int numPhysicalOutputs, + OutputContext context, + boolean generateVmEvent, + boolean isCompositeEvent) throws + IOException { + DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto + .newBuilder(); + + + // Construct the VertexManager event if required. + if (generateVmEvent) { + ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); + vmBuilder.setOutputSize(0); + VertexManagerEvent vmEvent = VertexManagerEvent.create( + context.getDestinationVertexName(), + vmBuilder.build().toByteString().asReadOnlyByteBuffer()); + eventList.add(vmEvent); + } + + // Construct the DataMovementEvent + // Always set empty partition information since no files were generated. + LOG.info("Setting all {} partitions as empty for non-started output", numPhysicalOutputs); + BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs); + emptyPartitionDetails.set(0, numPhysicalOutputs, true); + ByteString emptyPartitionsBytesString = + TezCommonUtils.compressByteArrayToByteString( + TezUtilsInternal.toByteArray(emptyPartitionDetails)); + payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); + payloadBuilder.setRunDuration(0); + DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); + ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer(); + + + if (isCompositeEvent) { + CompositeDataMovementEvent cdme = + CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload); + eventList.add(cdme); + } else { + DataMovementEvent dme = DataMovementEvent.create(0, dmePayload); + eventList.add(dme); + } + } + + /** * Generate events when spill happens * * @param eventList events would be added to this list http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 40edc76..6227fb9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -185,11 +185,13 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { this.endTime = System.nanoTime(); returnEvents = generateEvents(); } else { - LOG.warn("Attempting to close output " + getContext().getDestinationVertexName() - + " before it was started"); - returnEvents = Collections.emptyList(); + LOG.warn( + "Attempting to close output {} of type {} before it was started. Generating empty events", + getContext().getDestinationVertexName(), this.getClass().getSimpleName()); + returnEvents = generateEmptyEvents(); } - + + // This works for non-started outputs since new counters will be created with an initial value of 0 long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); long outputRecords = getContext().getCounters() @@ -210,6 +212,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { return eventList; } + private List<Event> generateEmptyEvents() throws IOException { + List<Event> eventList = Lists.newLinkedList(); + ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true); + return eventList; + } + private static final Set<String> confKeys = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 2c26374..08e6ec0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -20,10 +20,12 @@ package org.apache.tez.runtime.library.output; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -125,9 +127,16 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { //TODO: Do we need to support sending payloads via events? returnEvents = kvWriter.close(); } else { - returnEvents = Collections.emptyList(); + LOG.warn( + "Attempting to close output {} of type {} before it was started. Generating empty events", + getContext().getDestinationVertexName(), this.getClass().getSimpleName()); + returnEvents = new LinkedList<Event>(); + ShuffleUtils + .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), + false, false); } - + + // This works for non-started outputs since new counters will be created with an initial value of 0 long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); long outputRecords = getContext().getCounters() http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 34f2e3e..38450ee 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -20,12 +20,14 @@ package org.apache.tez.runtime.library.output; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Preconditions; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -102,9 +104,16 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { if (isStarted.get()) { returnEvents = kvWriter.close(); } else { - returnEvents = Collections.emptyList(); + LOG.warn( + "Attempting to close output {} of type {} before it was started. Generating empty events", + getContext().getDestinationVertexName(), this.getClass().getSimpleName()); + returnEvents = new LinkedList<Event>(); + ShuffleUtils + .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), + false, true); } + // This works for non-started outputs since new counters will be created with an initial value of 0 long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); long outputRecords = getContext().getCounters() http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java new file mode 100644 index 0000000..db9a0ed --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; + +public class OutputTestHelpers { + static OutputContext createOutputContext() throws IOException { + OutputContext outputContext = mock(OutputContext.class); + Configuration conf = new TezConfiguration(); + UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); + String[] workingDirs = new String[]{"workDir1"}; + OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class); + TezCounters counters = new TezCounters(); + + doReturn("destinationVertex").when(outputContext).getDestinationVertexName(); + doReturn(payLoad).when(outputContext).getUserPayload(); + doReturn(workingDirs).when(outputContext).getWorkDirs(); + doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask(); + doReturn(counters).when(outputContext).getCounters(); + doReturn(statsReporter).when(outputContext).getStatisticsReporter(); + return outputContext; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java new file mode 100644 index 0000000..8e76a8b --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestOrderedPartitionedKVOutput2 { + + + @Test(timeout = 5000) + public void testNonStartedOutput() throws IOException { + OutputContext outputContext = OutputTestHelpers.createOutputContext(); + int numPartitions = 10; + OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(2, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof VertexManagerEvent); + Event event2 = events.get(1); + assertTrue(event2 instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2; + ByteBuffer bb = cdme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0 ; i < numPartitions ; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java new file mode 100644 index 0000000..ecc1241 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestUnorderedKVOutput2 { + + @Test(timeout = 5000) + public void testNonStartedOutput() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(); + int numPartitions = 1; + UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(1, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof DataMovementEvent); + DataMovementEvent dme = (DataMovementEvent) event1; + ByteBuffer bb = dme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0 ; i < numPartitions ; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java new file mode 100644 index 0000000..eec4bf5 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestUnorderedPartitionedKVOutput2 { + + + @Test(timeout = 5000) + public void testNonStartedOutput() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(); + int numPartitions = 1; + UnorderedPartitionedKVOutput output = + new UnorderedPartitionedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(1, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1; + ByteBuffer bb = dme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0; i < numPartitions; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } +}
