Repository: tez Updated Branches: refs/heads/branch-0.5 0fa38a830 -> f944b5cd6
TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f944b5cd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f944b5cd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f944b5cd Branch: refs/heads/branch-0.5 Commit: f944b5cd6976de074c61b4323a143e26be624a3e Parents: 0fa38a8 Author: Siddharth Seth <[email protected]> Authored: Wed Jul 29 18:55:28 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Jul 29 18:55:28 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/mapreduce/input/MRInput.java | 26 +++++++- .../tez/mapreduce/input/MultiMRInput.java | 9 ++- .../apache/tez/mapreduce/input/TestMRInput.java | 69 ++++++++++++++++++++ .../tez/mapreduce/input/TestMultiMRInput.java | 34 ++++++++++ 5 files changed, 137 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2aa1247..0fe8b58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ Release 0.5.5: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error Release 0.5.4: 2015-06-26 http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index f38fc9c..4a0006d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -423,7 +423,9 @@ public class MRInput extends MRInputBase { @Override public void start() { - Preconditions.checkState(getNumPhysicalInputs() == 1, "Expecting only 1 physical input for MRInput"); + Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1, + "Expecting 0 or 1 physical input for MRInput"); + LOG.info("MRInput setup to received events" + getNumPhysicalInputs()); } @Private @@ -479,6 +481,24 @@ public class MRInput extends MRInputBase { .checkState(readerCreated == false, "Only a single instance of record reader can be created for this input."); readerCreated = true; + if (getNumPhysicalInputs() == 0) { + return new KeyValueReader() { + @Override + public boolean next() throws IOException { + return false; + } + + @Override + public Object getCurrentKey() throws IOException { + return null; + } + + @Override + public Object getCurrentValue() throws IOException { + return null; + } + }; + } rrLock.lock(); try { if (!mrReader.isSetup()) @@ -492,6 +512,10 @@ public class MRInput extends MRInputBase { @Override public void handleEvents(List<Event> inputEvents) throws Exception { + if (getNumPhysicalInputs() == 0) { + throw new IllegalStateException( + "Unexpected event. MRInput has been setup to receive 0 events"); + } if (eventReceived || inputEvents.size() != 1) { throw new IllegalStateException( "MRInput expects only a single input. Received: current eventListSize: " http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 39dafb8..0f4d0ae 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -109,6 +109,9 @@ public class MultiMRInput extends MRInputBase { super.initialize(); LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: " + getNumPhysicalInputs()); + if (getNumPhysicalInputs() == 0) { + getContext().inputIsReady(); + } return null; } @@ -139,6 +142,10 @@ public class MultiMRInput extends MRInputBase { public void handleEvents(List<Event> inputEvents) throws Exception { lock.lock(); try { + if (getNumPhysicalInputs() == 0) { + throw new IllegalStateException( + "Unexpected event. MultiMRInput has been setup to receive 0 events"); + } Preconditions.checkState(eventCount.get() + inputEvents.size() <= getNumPhysicalInputs(), "Unexpected event. All physical sources already initialized"); for (Event event : inputEvents) { @@ -192,6 +199,6 @@ public class MultiMRInput extends MRInputBase { @Override public void start() throws Exception { - Preconditions.checkState(getNumPhysicalInputs() >= 1, "Expecting one or more physical inputs"); + Preconditions.checkState(getNumPhysicalInputs() >= 0, "Expecting zero or more physical inputs"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java new file mode 100644 index 0000000..61b6f81 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.input; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputContext; +import org.junit.Test; + +public class TestMRInput { + + @Test(timeout = 5000) + public void test0PhysicalInputs() throws IOException { + InputContext inputContext = mock(InputContext.class); + + DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false), + FileInputFormat.class, "testPath").build(); + + ApplicationId applicationId = ApplicationId.newInstance(1000, 1); + doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload(); + doReturn(applicationId).when(inputContext).getApplicationId(); + doReturn(1).when(inputContext).getTaskIndex(); + doReturn(1).when(inputContext).getTaskAttemptNumber(); + doReturn(new TezCounters()).when(inputContext).getCounters(); + + + MRInput mrInput = new MRInput(inputContext, 0); + + mrInput.initialize(); + + mrInput.start(); + + assertFalse(mrInput.getReader().next()); + + List<Event> events = new LinkedList<Event>(); + try { + mrInput.handleEvents(events); + fail("HandleEvents should cause an input with 0 physical inputs to fail"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 05f6bbc..33c5233 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -84,6 +85,39 @@ public class TestMultiMRInput { } @Test(timeout = 5000) + public void test0PhysicalInputs() throws Exception { + + Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit"); + JobConf jobConf = new JobConf(defaultConf); + jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(jobConf, workDir); + + MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); + builder.setGroupingEnabled(false); + builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); + byte[] payload = builder.build().toByteArray(); + + InputContext inputContext = createTezInputContext(payload); + + + MultiMRInput mMrInput = new MultiMRInput(inputContext, 0); + + mMrInput.initialize(); + + mMrInput.start(); + + assertEquals(0, mMrInput.getKeyValueReaders().size()); + + List<Event> events = new LinkedList<Event>(); + try { + mMrInput.handleEvents(events); + fail("HandleEvents should cause an input with 0 physical inputs to fail"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } + + @Test(timeout = 5000) public void testSingleSplit() throws Exception { Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
