Repository: tez Updated Branches: refs/heads/master cb3338e86 -> 65265de17
TEZ-1257. Error on empty partition when using OnFileUnorderedKVOutput and ShuffledMergedInput Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/65265de1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/65265de1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/65265de1 Branch: refs/heads/master Commit: 65265de17b2356dfbe6bc3e1a54804cd9eb3850b Parents: cb3338e Author: Rajesh Balamohan <[email protected]> Authored: Tue Jul 29 06:38:35 2014 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Jul 29 06:38:35 2014 +0530 ---------------------------------------------------------------------- .../shuffle/impl/ShuffleInputEventHandler.java | 18 +- .../library/shuffle/common/ShuffleUtils.java | 14 ++ .../impl/ShuffleInputEventHandlerImpl.java | 15 +- .../impl/TestShuffleInputEventHandler.java | 171 +++++++++++++++++++ 4 files changed, 198 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java index ac88865..6fdf65e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.BitSet; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tez.common.TezCommonUtils; @@ -78,11 +79,8 @@ public class ShuffleInputEventHandler { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int partitionId = dmEvent.getSourceIndex(); - URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId); - InputAttemptIdentifier srcAttemptIdentifier = - new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent()); - LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier); - + LOG.info("DataMovementEvent partitionId:" + partitionId + ", targetIndex: " + dmEvent.getTargetIndex() + + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); // TODO NEWTEZ See if this duration hack can be removed. int duration = shufflePayload.getRunDuration(); if (duration > maxMapRuntime) { @@ -94,6 +92,8 @@ public class ShuffleInputEventHandler { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions); if (emptyPartitionsBitSet.get(partitionId)) { + InputAttemptIdentifier srcAttemptIdentifier = + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion()); LOG.info("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."); scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null); @@ -104,7 +104,10 @@ public class ShuffleInputEventHandler { "the empty partition to succeeded", e); } } - scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), + URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId); + InputAttemptIdentifier srcAttemptIdentifier = + new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent()); + scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), partitionId, baseUri.toString(), srcAttemptIdentifier); } @@ -115,7 +118,8 @@ public class ShuffleInputEventHandler { } // TODO NEWTEZ Handle encrypted shuffle - private URI getBaseURI(String host, int port, int partitionId) { + @VisibleForTesting + URI getBaseURI(String host, int port, int partitionId) { StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, partitionId, inputContext.getApplicationId().toString(), sslShuffle); URI u = URI.create(sb.toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java index c7b1dde..44b9a3b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java @@ -44,6 +44,7 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams; import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParamsBuilder; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; public class ShuffleUtils { @@ -220,5 +221,18 @@ public class ShuffleUtils { return builder.build(); } + + public static String stringify(DataMovementEventPayloadProto dmProto) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", "); + sb.append("host: " + dmProto.getHost()).append(", "); + sb.append("port: " + dmProto.getPort()).append(", "); + sb.append("pathComponent: " + dmProto.getPathComponent()).append(", "); + sb.append("runDuration: " + dmProto.getRunDuration()).append(", "); + sb.append("hasDataInEvent: " + dmProto.hasData()); + sb.append("]"); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java index 65dba32..83d9502 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java @@ -99,7 +99,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { LOG.info("Processing DataMovementEvent with srcIndex: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() + ", attemptNum: " + dme.getVersion() + ", payload: " - + stringify(shufflePayload)); + + ShuffleUtils.stringify(shufflePayload)); if (shufflePayload.hasEmptyPartitions()) { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload @@ -154,17 +154,6 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } - - private String stringify(DataMovementEventPayloadProto dmProto) { - StringBuilder sb = new StringBuilder(); - sb.append("["); - sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", "); - sb.append("host: " + dmProto.getHost()).append(", "); - sb.append("port: " + dmProto.getPort()).append(", "); - sb.append("pathComponent: " + dmProto.getPathComponent()).append(", "); - sb.append("runDuration: " + dmProto.getRunDuration()).append(", "); - sb.append("hasDataInEvent: " + dmProto.hasData()); - return sb.toString(); - } + } http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java new file mode 100644 index 0000000..f921567 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java @@ -0,0 +1,171 @@ +package org.apache.tez.runtime.library.common.shuffle.impl; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputFailedEvent; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestShuffleInputEventHandler { + + private static final String HOST = "localhost"; + private static final int PORT = 8080; + private static final String PATH_COMPONENT = "attempt"; + + private ShuffleInputEventHandler handler; + private ShuffleScheduler scheduler; + + private TezInputContext createTezInputContext() { + ApplicationId applicationId = ApplicationId.newInstance(1, 1); + TezInputContext inputContext = mock(TezInputContext.class); + doReturn(applicationId).when(inputContext).getApplicationId(); + return inputContext; + } + + private Event createDataMovementEvent(int srcIndex, int targetIndex, + ByteString emptyPartitionByteString, boolean allPartitionsEmpty) { + ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder = + ShuffleUserPayloads.DataMovementEventPayloadProto + .newBuilder(); + if (!allPartitionsEmpty) { + builder.setHost(HOST); + builder.setPort(PORT); + builder.setPathComponent(PATH_COMPONENT); + } + builder.setRunDuration(10); + if (emptyPartitionByteString != null) { + builder.setEmptyPartitions(emptyPartitionByteString); + } + return new DataMovementEvent(srcIndex, targetIndex, 0, builder.build().toByteArray()); + } + + @Before + public void setup() throws Exception { + TezInputContext inputContext = createTezInputContext(); + scheduler = mock(ShuffleScheduler.class); + handler = new ShuffleInputEventHandler(inputContext, scheduler, false); + } + + @Test + public void basicTest() { + List<Event> events = new LinkedList<Event>(); + int srcIdx = 0; + int targetIdx = 1; + Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false); + events.add(dme); + handler.handleEvents(events); + InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0, + PATH_COMPONENT); + + String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); + int partitionId = srcIdx; + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), + eq(baseUri), eq(expectedIdentifier)); + } + + @Test + public void testFailedEvent() { + List<Event> events = new LinkedList<Event>(); + int targetIdx = 1; + InputFailedEvent failedEvent = new InputFailedEvent(targetIdx, 0); + events.add(failedEvent); + handler.handleEvents(events); + InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); + verify(scheduler).obsoleteInput(eq(expectedIdentifier)); + } + + @Test + public void testAllPartitionsEmpty() throws IOException { + List<Event> events = new LinkedList<Event>(); + int srcIdx = 0; + int targetIdx = 1; + Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx) + , true); + events.add(dme); + handler.handleEvents(events); + + InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); + verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), + eq(0l), eq(0l), any(MapOutput.class)); + } + + @Test + public void testCurrentPartitionEmpty() throws IOException { + List<Event> events = new LinkedList<Event>(); + int srcIdx = 0; + int targetIdx = 1; + Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx) + , false); + events.add(dme); + handler.handleEvents(events); + + InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); + + verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), + eq(0l), eq(0l), any(MapOutput.class)); + } + + @Test + public void testOtherPartitionEmpty() throws IOException { + List<Event> events = new LinkedList<Event>(); + int srcIdx = 0; + int taskIndex = 1; + Event dme = createDataMovementEvent(srcIdx, taskIndex, createEmptyPartitionByteString(100), + false); + events.add(dme); + handler.handleEvents(events); + + String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); + int partitionId = srcIdx; + InputAttemptIdentifier expectedIdentifier = + new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT); + + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), + eq(expectedIdentifier)); + } + + private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException { + BitSet bitSet = new BitSet(); + for (int i : emptyPartitions) { + bitSet.set(i); + } + return TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(bitSet)); + } + +}
