Repository: tez Updated Branches: refs/heads/branch-0.6 60dc76c8f -> 71fa843c0
TEZ-2237. Valid events should be sent out when an Output is not started. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71fa843c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71fa843c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71fa843c Branch: refs/heads/branch-0.6 Commit: 71fa843c0c70abd45c35d95b056c90f4ade258cf Parents: 60dc76c Author: Siddharth Seth <[email protected]> Authored: Mon May 4 16:31:12 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon May 4 16:31:12 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/shuffle/ShuffleUtils.java | 64 ++++++++++++++++ .../output/OrderedPartitionedKVOutput.java | 15 +++- .../output/UnorderedPartitionedKVOutput.java | 11 ++- .../library/output/OutputTestHelpers.java | 49 ++++++++++++ .../output/TestOrderedPartitionedKVOutput2.java | 67 +++++++++++++++++ .../library/output/TestUnorderedKVOutput2.java | 79 ++++++++++++++++++++ .../TestUnorderedPartitionedKVOutput2.java | 62 +++++++++++++++ 8 files changed, 344 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 24c5f51..dc9a42e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2237. Valid events should be sent out when an Output is not started. TEZ-2399. Tez UI: add proper dependencies for computed properties TEZ-1988. Tez UI: does not work when using file:// in a browser TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index fb929e5..f686f1f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -24,10 +24,12 @@ import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; +import java.util.BitSet; import java.util.List; import javax.crypto.SecretKey; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,13 +38,21 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams; import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; public class ShuffleUtils { @@ -220,6 +230,60 @@ public class ShuffleUtils { return builder.build(); } + /** + * Generate events for outputs which have not been started. + * @param eventList + * @param numPhysicalOutputs + * @param context + * @param generateVmEvent whether to generate a vm event or not + * @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent + * @throws IOException + */ + public static void generateEventsForNonStartedOutput(List<Event> eventList, + int numPhysicalOutputs, + OutputContext context, + boolean generateVmEvent, + boolean isCompositeEvent) throws + IOException { + DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto + .newBuilder(); + + + // Construct the VertexManager event if required. + if (generateVmEvent) { + ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); + vmBuilder.setOutputSize(0); + VertexManagerEvent vmEvent = VertexManagerEvent.create( + context.getDestinationVertexName(), + vmBuilder.build().toByteString().asReadOnlyByteBuffer()); + eventList.add(vmEvent); + } + + // Construct the DataMovementEvent + // Always set empty partition information since no files were generated. + LOG.info("Setting all " + numPhysicalOutputs + " partitions as empty for non-started output: "); + BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs); + emptyPartitionDetails.set(0, numPhysicalOutputs, true); + ByteString emptyPartitionsBytesString = + TezCommonUtils.compressByteArrayToByteString( + TezUtilsInternal.toByteArray(emptyPartitionDetails)); + payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); + payloadBuilder.setRunDuration(0); + DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); + ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer(); + + + if (isCompositeEvent) { + CompositeDataMovementEvent cdme = + CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload); + eventList.add(cdme); + } else { + DataMovementEvent dme = DataMovementEvent.create(0, dmePayload); + eventList.add(dme); + } + } + public static String stringify(DataMovementEventPayloadProto dmProto) { StringBuilder sb = new StringBuilder(); sb.append("["); http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index b3290a5..bcdd736 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -147,9 +148,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { this.endTime = System.nanoTime(); return generateEventsOnClose(); } else { - LOG.warn("Attempting to close output " + getContext().getDestinationVertexName() - + " before it was started"); - return Collections.emptyList(); + LOG.warn( + "Attempting to close output " + getContext().getDestinationVertexName() + " of type " + + this.getClass().getSimpleName() + " before it was started. Generating empty events"); + + List<Event> returnEvents = generateEmptyEvents(); + return returnEvents; } } @@ -214,6 +218,11 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { return events; } + private List<Event> generateEmptyEvents() throws IOException { + List<Event> eventList = Lists.newLinkedList(); + ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true); + return eventList; + } private static final Set<String> confKeys = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 1e39535..755dfb8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.output; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +42,7 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; /** @@ -100,7 +102,14 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { if (isStarted.get()) { return kvWriter.close(); } else { - return Collections.emptyList(); + LOG.warn( + "Attempting to close output " + getContext().getDestinationVertexName() + " of type " + + this.getClass().getSimpleName() + " before it was started. Generating empty events"); + List<Event> returnEvents = new LinkedList<Event>(); + ShuffleUtils + .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), + false, true); + return returnEvents; } } http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java new file mode 100644 index 0000000..0f6e2ca --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.OutputContext; + +public class OutputTestHelpers { + static OutputContext createOutputContext(@Nullable Path workingDir) throws IOException { + OutputContext outputContext = mock(OutputContext.class); + Configuration conf = new TezConfiguration(); + UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); + String workDirString = workingDir == null ? "workDir" : workingDir.toString(); + String[] workingDirs = new String[]{workDirString}; + TezCounters counters = new TezCounters(); + + doReturn("destinationVertex").when(outputContext).getDestinationVertexName(); + doReturn(payLoad).when(outputContext).getUserPayload(); + doReturn(workingDirs).when(outputContext).getWorkDirs(); + doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask(); + doReturn(counters).when(outputContext).getCounters(); + doReturn(UUID.randomUUID().toString()).when(outputContext).getUniqueIdentifier(); + return outputContext; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java new file mode 100644 index 0000000..2dd895f --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestOrderedPartitionedKVOutput2 { + + + @Test(timeout = 5000) + public void testNonStartedOutput() throws IOException { + OutputContext outputContext = OutputTestHelpers.createOutputContext(null); + int numPartitions = 10; + OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(2, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof VertexManagerEvent); + Event event2 = events.get(1); + assertTrue(event2 instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2; + ByteBuffer bb = cdme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0 ; i < numPartitions ; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java new file mode 100644 index 0000000..fc0d996 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestUnorderedKVOutput2 { + + private FileSystem fs; + Path workingDir = new Path(".", this.getClass().getName()); + + @Before + public void setUp() throws Exception { + fs = FileSystem.getLocal(new Configuration()); + } + + @After + public void cleanup() throws IOException { + fs.delete(workingDir, true); + } + + @Test(timeout = 5000) + public void testNonStartedOutput() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(workingDir); + int numPartitions = 1; + UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(1, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof DataMovementEvent); + DataMovementEvent dme = (DataMovementEvent) event1; + ByteBuffer bb = dme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0 ; i < numPartitions ; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/71fa843c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java new file mode 100644 index 0000000..726ff08 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; + +import com.google.protobuf.ByteString; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Test; + +// Tests which don't require parameterization +public class TestUnorderedPartitionedKVOutput2 { + + + @Test(timeout = 5000) + public void testNonStartedOutput() throws Exception { + OutputContext outputContext = OutputTestHelpers.createOutputContext(null); + int numPartitions = 1; + UnorderedPartitionedKVOutput output = + new UnorderedPartitionedKVOutput(outputContext, numPartitions); + output.initialize(); + List<Event> events = output.close(); + assertEquals(1, events.size()); + Event event1 = events.get(0); + assertTrue(event1 instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1; + ByteBuffer bb = dme.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + assertTrue(shufflePayload.hasEmptyPartitions()); + + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload + .getEmptyPartitions()); + BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartionsBitSet.cardinality()); + for (int i = 0; i < numPartitions; i++) { + assertTrue(emptyPartionsBitSet.get(i)); + } + } +}
