Repository: tez Updated Branches: refs/heads/master cd0fc63e1 -> bec90035b
TEZ-1324. OnFileSortedOutput: send host/port/pathComponent details only when one of the partitions has data Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bec90035 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bec90035 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bec90035 Branch: refs/heads/master Commit: bec90035b7597a48607299e7ae2cc881086f50e7 Parents: cd0fc63 Author: Rajesh Balamohan <[email protected]> Authored: Wed Jul 30 04:20:38 2014 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Jul 30 04:20:38 2014 +0530 ---------------------------------------------------------------------- .../library/output/OnFileSortedOutput.java | 11 +- .../library/output/TestOnFileSortedOutput.java | 263 +++++++++++++++++++ 2 files changed, 271 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bec90035/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java index 240d2d6..f10cb20 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java @@ -154,6 +154,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); + boolean outputGenerated = true; if (sendEmptyPartitionDetails) { Path indexFile = sorter.getMapOutput().getOutputIndexFile(); TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); @@ -166,6 +167,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput { emptyPartitions++; } } + outputGenerated = (spillRecord.size() != emptyPartitions); if (emptyPartitions > 0) { ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails)); @@ -175,9 +177,12 @@ public class OnFileSortedOutput extends AbstractLogicalOutput { + ", compressedSize=" + emptyPartitionsBytesString.size()); } } - payloadBuilder.setHost(host); - payloadBuilder.setPort(shufflePort); - payloadBuilder.setPathComponent(getContext().getUniqueIdentifier()); + if (!sendEmptyPartitionDetails || outputGenerated) { + payloadBuilder.setHost(host); + payloadBuilder.setPort(shufflePort); + payloadBuilder.setPathComponent(getContext().getUniqueIdentifier()); + } + payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000)); DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); byte[] payloadBytes = payloadProto.toByteArray(); http://git-wip-us.apache.org/repos/asf/tez/blob/bec90035/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java new file mode 100644 index 0000000..40447aa --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -0,0 +1,263 @@ +package org.apache.tez.runtime.library.output; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.utils.EnvironmentUpdateUtils; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.MemoryUpdateCallback; +import org.apache.tez.runtime.api.TezOutputContext; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.library.api.KeyValuesWriter; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@RunWith(Parameterized.class) +public class TestOnFileSortedOutput { + private static final Random rnd = new Random(); + private static final String UniqueID = "UUID"; + private static final String HOST = "localhost"; + private static final int PORT = 80; + + private Configuration conf; + private FileSystem fs; + private Path workingDir; + //no of outputs + private int partitions; + //For sorter (pipelined / Default) + private int sorterThreads; + + private KeyValuesWriter writer; + private OnFileSortedOutput sortedOutput; + private boolean sendEmptyPartitionViaEvent; + //Partition index for which data should not be written to. + private int emptyPartitionIdx; + + /** + * Constructor + * + * @param sendEmptyPartitionViaEvent + * @param threads number of threads needed for sorter (pipelinedsorter or default sorter) + * @param emptyPartitionIdx for which data should not be generated + */ + public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, int threads, + int emptyPartitionIdx) throws IOException { + this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent; + this.emptyPartitionIdx = emptyPartitionIdx; + this.sorterThreads = threads; + + conf = new Configuration(); + + workingDir = new Path(".", this.getClass().getName()); + String localDirs = workingDir.toString(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + fs = FileSystem.getLocal(conf); + } + + @Before + public void setup() throws Exception { + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, sorterThreads); + + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, + HashPartitioner.class.getName()); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, + sendEmptyPartitionViaEvent); + + EnvironmentUpdateUtils.put(ApplicationConstants.Environment.NM_HOST.toString(), HOST); + fs.mkdirs(workingDir); + this.partitions = Math.max(1, rnd.nextInt(10)); + } + + @After + public void cleanup() throws IOException { + fs.delete(workingDir, true); + } + + @Parameterized.Parameters(name = "test[{0}, {1}, {2}]") + public static Collection<Object[]> getParameters() { + Collection<Object[]> parameters = new ArrayList<Object[]>(); + //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty + parameters.add(new Object[] { false, 1, -1 }); + parameters.add(new Object[] { false, 1, 0 }); + parameters.add(new Object[] { true, 1, -1 }); + parameters.add(new Object[] { true, 1, 0 }); + + //Pipelined sorter + parameters.add(new Object[] { false, 2, -1 }); + parameters.add(new Object[] { false, 2, 0 }); + parameters.add(new Object[] { true, 2, -1 }); + parameters.add(new Object[] { true, 2, 0 }); + + return parameters; + } + + private void startSortedOutput(int partitions) throws Exception { + sortedOutput = new OnFileSortedOutput(); + sortedOutput.setNumPhysicalOutputs(partitions); + TezOutputContext context = createTezOutputContext(); + sortedOutput.initialize(context); + sortedOutput.start(); + writer = sortedOutput.getWriter(); + } + + @Test + public void baseTest() throws Exception { + startSortedOutput(partitions); + + //Write random set of keys + for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) { + Text key = new Text(new BigInteger(256, rnd).toString()); + LinkedList values = new LinkedList(); + for (int j = 0; j < Math.max(2, rnd.nextInt(10)); j++) { + values.add(new Text(new BigInteger(256, rnd).toString())); + } + writer.write(key, values); + } + + List<Event> eventList = sortedOutput.close(); + assertTrue(eventList != null && eventList.size() == 2); + + ShuffleUserPayloads.DataMovementEventPayloadProto + payload = ShuffleUserPayloads.DataMovementEventPayloadProto + .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + + assertEquals(HOST, payload.getHost()); + assertEquals(PORT, payload.getPort()); + assertEquals(UniqueID, payload.getPathComponent()); + } + + @Test + public void testWithSomeEmptyPartition() throws Exception { + //ensure atleast 2 partitions are available + partitions = Math.max(2, partitions); + startSortedOutput(partitions); + + //write random data + for (int i = 0; i < 2 * partitions; i++) { + Text key = new Text(new BigInteger(256, rnd).toString()); + Text value = new Text(new BigInteger(256, rnd).toString()); + //skip writing to certain partitions + if (i % partitions != emptyPartitionIdx) { + writer.write(key, value); + } + } + + List<Event> eventList = sortedOutput.close(); + assertTrue(eventList != null && eventList.size() == 2); + + ShuffleUserPayloads.DataMovementEventPayloadProto + payload = ShuffleUserPayloads.DataMovementEventPayloadProto + .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + + assertEquals(HOST, payload.getHost()); + assertEquals(PORT, payload.getPort()); + assertEquals(UniqueID, payload.getPathComponent()); + } + + @Test + public void testAllEmptyPartition() throws Exception { + startSortedOutput(partitions); + + //Close output without writing any data to it. + List<Event> eventList = sortedOutput.close(); + assertTrue(eventList != null && eventList.size() == 2); + + ShuffleUserPayloads.DataMovementEventPayloadProto + payload = ShuffleUserPayloads.DataMovementEventPayloadProto + .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + if (sendEmptyPartitionViaEvent) { + assertEquals("", payload.getHost()); + assertEquals(0, payload.getPort()); + assertEquals("", payload.getPathComponent()); + } else { + assertEquals(HOST, payload.getHost()); + assertEquals(PORT, payload.getPort()); + assertEquals(UniqueID, payload.getPathComponent()); + } + } + + private TezOutputContext createTezOutputContext() throws IOException { + String[] workingDirs = { workingDir.toString() }; + byte[] payLoad = TezUtils.createUserPayloadFromConf(conf); + DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer(); + serviceProviderMetaData.writeInt(PORT); + + TezCounters counters = new TezCounters(); + + TezOutputContext context = mock(TezOutputContext.class); + doReturn(counters).when(context).getCounters(); + doReturn(workingDirs).when(context).getWorkDirs(); + doReturn(payLoad).when(context).getUserPayload(); + doReturn(100 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); + doReturn(UniqueID).when(context).getUniqueIdentifier(); + doReturn("v1").when(context).getDestinationVertexName(); + doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context) + .getServiceProviderMetaData + (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + long requestedSize = (Long) invocation.getArguments()[0]; + MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation + .getArguments()[1]; + callback.memoryAssigned(requestedSize); + return null; + } + }).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class)); + return context; + } + +}
