Repository: tez
Updated Branches:
  refs/heads/branch-0.5 0fa38a830 -> f944b5cd6


TEZ-2636. MRInput and MultiMRInput should work for cases when there are
0 physical inputs. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f944b5cd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f944b5cd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f944b5cd

Branch: refs/heads/branch-0.5
Commit: f944b5cd6976de074c61b4323a143e26be624a3e
Parents: 0fa38a8
Author: Siddharth Seth <[email protected]>
Authored: Wed Jul 29 18:55:28 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Wed Jul 29 18:55:28 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/mapreduce/input/MRInput.java | 26 +++++++-
 .../tez/mapreduce/input/MultiMRInput.java       |  9 ++-
 .../apache/tez/mapreduce/input/TestMRInput.java | 69 ++++++++++++++++++++
 .../tez/mapreduce/input/TestMultiMRInput.java   | 34 ++++++++++
 5 files changed, 137 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2aa1247..0fe8b58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Release 0.5.5: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 
physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
 
 Release 0.5.4: 2015-06-26

http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index f38fc9c..4a0006d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -423,7 +423,9 @@ public class MRInput extends MRInputBase {
 
   @Override
   public void start() {
-    Preconditions.checkState(getNumPhysicalInputs() == 1, "Expecting only 1 
physical input for MRInput");
+    Preconditions.checkState(getNumPhysicalInputs() == 0 || 
getNumPhysicalInputs() == 1,
+        "Expecting 0 or 1 physical input for MRInput");
+    LOG.info("MRInput setup to received events" + getNumPhysicalInputs());
   }
 
   @Private
@@ -479,6 +481,24 @@ public class MRInput extends MRInputBase {
         .checkState(readerCreated == false,
             "Only a single instance of record reader can be created for this 
input.");
     readerCreated = true;
+    if (getNumPhysicalInputs() == 0) {
+      return new KeyValueReader() {
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          return null;
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException {
+          return null;
+        }
+      };
+    }
     rrLock.lock();
     try {
       if (!mrReader.isSetup())
@@ -492,6 +512,10 @@ public class MRInput extends MRInputBase {
 
   @Override
   public void handleEvents(List<Event> inputEvents) throws Exception {
+    if (getNumPhysicalInputs() == 0) {
+      throw new IllegalStateException(
+          "Unexpected event. MRInput has been setup to receive 0 events");
+    }
     if (eventReceived || inputEvents.size() != 1) {
       throw new IllegalStateException(
           "MRInput expects only a single input. Received: current 
eventListSize: "

http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 39dafb8..0f4d0ae 100644
--- 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -109,6 +109,9 @@ public class MultiMRInput extends MRInputBase {
     super.initialize();
     LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: "
         + getNumPhysicalInputs());
+    if (getNumPhysicalInputs() == 0) {
+      getContext().inputIsReady();
+    }
     return null;
   }
 
@@ -139,6 +142,10 @@ public class MultiMRInput extends MRInputBase {
   public void handleEvents(List<Event> inputEvents) throws Exception {
     lock.lock();
     try {
+      if (getNumPhysicalInputs() == 0) {
+        throw new IllegalStateException(
+            "Unexpected event. MultiMRInput has been setup to receive 0 
events");
+      }
       Preconditions.checkState(eventCount.get() + inputEvents.size() <= 
getNumPhysicalInputs(),
           "Unexpected event. All physical sources already initialized");
       for (Event event : inputEvents) {
@@ -192,6 +199,6 @@ public class MultiMRInput extends MRInputBase {
 
   @Override
   public void start() throws Exception {
-    Preconditions.checkState(getNumPhysicalInputs() >= 1, "Expecting one or 
more physical inputs");
+    Preconditions.checkState(getNumPhysicalInputs() >= 0, "Expecting zero or 
more physical inputs");
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
new file mode 100644
index 0000000..61b6f81
--- /dev/null
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.junit.Test;
+
+public class TestMRInput {
+
+  @Test(timeout = 5000)
+  public void test0PhysicalInputs() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+
+    DataSourceDescriptor dsd = MRInput.createConfigBuilder(new 
Configuration(false),
+        FileInputFormat.class, "testPath").build();
+
+    ApplicationId applicationId = ApplicationId.newInstance(1000, 1);
+    
doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn(1).when(inputContext).getTaskIndex();
+    doReturn(1).when(inputContext).getTaskAttemptNumber();
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+
+
+    MRInput mrInput = new MRInput(inputContext, 0);
+
+    mrInput.initialize();
+
+    mrInput.start();
+
+    assertFalse(mrInput.getReader().next());
+
+    List<Event> events = new LinkedList<Event>();
+    try {
+      mrInput.handleEvents(events);
+      fail("HandleEvents should cause an input with 0 physical inputs to 
fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalStateException);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f944b5cd/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 05f6bbc..33c5233 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
@@ -84,6 +85,39 @@ public class TestMultiMRInput {
   }
 
   @Test(timeout = 5000)
+  public void test0PhysicalInputs() throws Exception {
+
+    Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
+    JobConf jobConf = new JobConf(defaultConf);
+    
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    MRInputUserPayloadProto.Builder builder = 
MRInputUserPayloadProto.newBuilder();
+    builder.setGroupingEnabled(false);
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
+    byte[] payload = builder.build().toByteArray();
+
+    InputContext inputContext = createTezInputContext(payload);
+
+
+    MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
+
+    mMrInput.initialize();
+
+    mMrInput.start();
+
+    assertEquals(0, mMrInput.getKeyValueReaders().size());
+
+    List<Event> events = new LinkedList<Event>();
+    try {
+      mMrInput.handleEvents(events);
+      fail("HandleEvents should cause an input with 0 physical inputs to 
fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalStateException);
+    }
+  }
+
+  @Test(timeout = 5000)
   public void testSingleSplit() throws Exception {
 
     Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");

Reply via email to