Repository: tez Updated Branches: refs/heads/master 5de0cd3a4 -> df87125e1
TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/df87125e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/df87125e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/df87125e Branch: refs/heads/master Commit: df87125e166675382e7da7a24c18730351c423a3 Parents: 5de0cd3 Author: Siddharth Seth <[email protected]> Authored: Wed Jul 29 18:50:46 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Jul 29 18:50:46 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 8 +++ .../org/apache/tez/mapreduce/input/MRInput.java | 26 +++++++- .../tez/mapreduce/input/MultiMRInput.java | 9 ++- .../apache/tez/mapreduce/input/TestMRInput.java | 69 ++++++++++++++++++++ .../tez/mapreduce/input/TestMultiMRInput.java | 34 ++++++++++ 5 files changed, 144 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/df87125e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0953aec..8a10fb4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -470,6 +470,14 @@ TEZ-UI CHANGES (TEZ-8): TEZ-1783. Wrapper in standalone mode. TEZ-1820. Fix wrong links. +Release 0.5.5: Unreleased + +INCOMPATIBLE CHANGES + +ALL CHANGES: + TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. + TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error + Release 0.5.4: 2015-06-26 ALL CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/df87125e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 270f68f..70365cd 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -445,7 +445,9 @@ public class MRInput extends MRInputBase { @Override public void start() { - Preconditions.checkState(getNumPhysicalInputs() == 1, "Expecting only 1 physical input for MRInput"); + Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1, + "Expecting 0 or 1 physical input for MRInput"); + LOG.info("MRInput setup to received {} events", getNumPhysicalInputs()); } @Private @@ -502,6 +504,24 @@ public class MRInput extends MRInputBase { .checkState(readerCreated == false, "Only a single instance of record reader can be created for this input."); readerCreated = true; + if (getNumPhysicalInputs() == 0) { + return new KeyValueReader() { + @Override + public boolean next() throws IOException { + return false; + } + + @Override + public Object getCurrentKey() throws IOException { + return null; + } + + @Override + public Object getCurrentValue() throws IOException { + return null; + } + }; + } rrLock.lock(); try { if (!mrReader.isSetup()) @@ -515,6 +535,10 @@ public class MRInput extends MRInputBase { @Override public void handleEvents(List<Event> inputEvents) throws Exception { + if (getNumPhysicalInputs() == 0) { + throw new IllegalStateException( + "Unexpected event. MRInput has been setup to receive 0 events"); + } if (eventReceived || inputEvents.size() != 1) { throw new IllegalStateException( "MRInput expects only a single input. Received: current eventListSize: " http://git-wip-us.apache.org/repos/asf/tez/blob/df87125e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 425d737..44d9c96 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -110,6 +110,9 @@ public class MultiMRInput extends MRInputBase { super.initialize(); LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: " + getNumPhysicalInputs()); + if (getNumPhysicalInputs() == 0) { + getContext().inputIsReady(); + } return null; } @@ -140,6 +143,10 @@ public class MultiMRInput extends MRInputBase { public void handleEvents(List<Event> inputEvents) throws Exception { lock.lock(); try { + if (getNumPhysicalInputs() == 0) { + throw new IllegalStateException( + "Unexpected event. MultiMRInput has been setup to receive 0 events"); + } Preconditions.checkState(eventCount.get() + inputEvents.size() <= getNumPhysicalInputs(), "Unexpected event. All physical sources already initialized"); for (Event event : inputEvents) { @@ -197,6 +204,6 @@ public class MultiMRInput extends MRInputBase { @Override public void start() throws Exception { - Preconditions.checkState(getNumPhysicalInputs() >= 1, "Expecting one or more physical inputs"); + Preconditions.checkState(getNumPhysicalInputs() >= 0, "Expecting zero or more physical inputs"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/df87125e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java new file mode 100644 index 0000000..50114b9 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.input; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputContext; +import org.junit.Test; + +public class TestMRInput { + + @Test(timeout = 5000) + public void test0PhysicalInputs() throws IOException { + InputContext inputContext = mock(InputContext.class); + + DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false), + FileInputFormat.class, "testPath").build(); + + ApplicationId applicationId = ApplicationId.newInstance(1000, 1); + doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload(); + doReturn(applicationId).when(inputContext).getApplicationId(); + doReturn(1).when(inputContext).getTaskIndex(); + doReturn(1).when(inputContext).getTaskAttemptNumber(); + doReturn(new TezCounters()).when(inputContext).getCounters(); + + + MRInput mrInput = new MRInput(inputContext, 0); + + mrInput.initialize(); + + mrInput.start(); + + assertFalse(mrInput.getReader().next()); + + List<Event> events = new LinkedList<>(); + try { + mrInput.handleEvents(events); + fail("HandleEvents should cause an input with 0 physical inputs to fail"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/df87125e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 4031140..80e3e77 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -85,6 +86,39 @@ public class TestMultiMRInput { } @Test(timeout = 5000) + public void test0PhysicalInputs() throws Exception { + + Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit"); + JobConf jobConf = new JobConf(defaultConf); + jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(jobConf, workDir); + + MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); + builder.setGroupingEnabled(false); + builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); + byte[] payload = builder.build().toByteArray(); + + InputContext inputContext = createTezInputContext(payload); + + + MultiMRInput mMrInput = new MultiMRInput(inputContext, 0); + + mMrInput.initialize(); + + mMrInput.start(); + + assertEquals(0, mMrInput.getKeyValueReaders().size()); + + List<Event> events = new LinkedList<>(); + try { + mMrInput.handleEvents(events); + fail("HandleEvents should cause an input with 0 physical inputs to fail"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } + + @Test(timeout = 5000) public void testSingleSplit() throws Exception { Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
