Author: gunther
Date: Fri Oct  4 04:02:12 2013
New Revision: 1529076

URL: http://svn.apache.org/r1529076
Log:
HIVE-5390: custom LogicalIOProcessor - reduce record processor (Thejas Nair via 
Gunther Hagleitner)

Added:
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
Modified:
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1529076&r1=1529075&r2=1529076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Fri Oct  4 04:02:12 2013
@@ -78,8 +78,6 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 
 /**
@@ -233,7 +231,7 @@ public class DagUtils {
     byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
     if (inputSplitInfo.getNumTasks() != 0) {
       map = new Vertex(mapWork.getName(),
-          new ProcessorDescriptor(TezProcessor.class.getName()).
+          new ProcessorDescriptor(MapTezProcessor.class.getName()).
                setUserPayload(serializedConf),
           inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf));
       Map<String, String> environment = new HashMap<String, String>();
@@ -313,7 +311,7 @@ public class DagUtils {
 
     // create the vertex
     Vertex reducer = new Vertex(reduceWork.getName(),
-        new ProcessorDescriptor(ReduceProcessor.class.getName()).
+        new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
              setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
         reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
 

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -18,10 +18,12 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -45,26 +47,36 @@ import org.apache.tez.runtime.library.ap
  */
 public class MapRecordProcessor  extends RecordProcessor{
 
-  private static final String PLAN_KEY = "__MAP_PLAN__";
+
   private MapOperator mapOp;
   public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
   private final ExecMapperContext execContext = new ExecMapperContext();
   private boolean abort = false;
+  protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
 
   @Override
-  void init(JobConf jconf, MRTaskReporter mrReporter, Collection<LogicalInput> 
inputs,
+  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, 
LogicalInput> inputs,
       OutputCollector out){
     super.init(jconf, mrReporter, inputs, out);
 
+    //Update JobConf using MRInput, info like filename comes via this
+    MRInput mrInput = getMRInput(inputs);
+    Configuration updatedConf = mrInput.getConfigUpdates();
+    if (updatedConf != null) {
+      for (Entry<String, String> entry : updatedConf) {
+        jconf.set(entry.getKey(), entry.getValue());
+      }
+    }
+
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
     try {
 
       execContext.setJc(jconf);
       // create map and fetch operators
-      MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY);
+      MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
       if (mrwork == null) {
         mrwork = Utilities.getMapWork(jconf);
-        cache.cache(PLAN_KEY, mrwork);
+        cache.cache(MAP_PLAN_KEY, mrwork);
       }
       mapOp = new MapOperator();
 
@@ -94,6 +106,21 @@ public class MapRecordProcessor  extends
     }
   }
 
+  private MRInput getMRInput(Map<String, LogicalInput> inputs) {
+    //there should be only one MRInput
+    MRInput theMRInput = null;
+    for(LogicalInput inp : inputs.values()){
+      if(inp instanceof MRInput){
+        if(theMRInput != null){
+          throw new IllegalArgumentException("Only one MRInput is expected");
+        }
+        //a better logic would be to find the alias
+        theMRInput = (MRInput)inp;
+      }
+    }
+    return theMRInput;
+  }
+
   @Override
   void run() throws IOException{
     if (inputs.size() != 1) {
@@ -101,7 +128,7 @@ public class MapRecordProcessor  extends
           + ", inputCount=" + inputs.size());
     }
 
-    MRInput in = (MRInput)inputs.iterator().next();
+    MRInput in = getMRInput(inputs);
     KeyValueReader reader = in.getReader();
 
     //process records until done

Added: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1529076&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
 (added)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * Subclass that is used to indicate if this is a map or reduce process
+ */
+public class MapTezProcessor extends TezProcessor {
+  public MapTezProcessor(){
+    super(true);
+  }
+}

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -21,7 +21,7 @@ import java.lang.management.ManagementFa
 import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,7 +37,7 @@ import org.apache.tez.runtime.api.Logica
 public abstract class RecordProcessor  {
 
   protected JobConf jconf;
-  protected Collection<LogicalInput> inputs;
+  protected Map<String, LogicalInput> inputs;
   protected OutputCollector out;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -59,7 +59,7 @@ public abstract class RecordProcessor  {
    * @param inputs
    * @param out
    */
-  void init(JobConf jconf, MRTaskReporter mrReporter, Collection<LogicalInput> 
inputs,
+  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, 
LogicalInput> inputs,
       OutputCollector out){
     this.jconf = jconf;
     this.reporter = mrReporter;

Added: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1529076&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
 (added)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+public class ReduceRecordProcessor  extends RecordProcessor{
+  private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
+
+  public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
+  private final ExecMapperContext execContext = new ExecMapperContext();
+  private boolean abort = false;
+  private Deserializer inputKeyDeserializer;
+
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
+
+  TableDesc keyTableDesc;
+  TableDesc[] valueTableDesc;
+
+  ObjectInspector[] rowObjectInspector;
+  private Operator<?> reducer;
+  private boolean isTagged = false;
+
+  private Object keyObject = null;
+  private BytesWritable groupKey;
+
+  List<Object> row = new 
ArrayList<Object>(Utilities.reduceFieldNameList.size());
+
+  @Override
+  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, 
LogicalInput> inputs,
+      OutputCollector out){
+    super.init(jconf, mrReporter, inputs, out);
+
+    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+
+    rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector[] valueObjectInspector = new 
ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector keyObjectInspector;
+
+    ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
+    if (redWork == null) {
+      redWork = Utilities.getReduceWork(jconf);
+      cache.cache(REDUCE_PLAN_KEY, redWork);
+    }
+
+    reducer = redWork.getReducer();
+    reducer.setParentOperators(null); // clear out any parents as reducer is 
the
+    // root
+    isTagged = redWork.getNeedsTagging();
+    try {
+      keyTableDesc = redWork.getKeyDesc();
+      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+          .getDeserializerClass(), null);
+      inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()];
+      for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
+        // We should initialize the SerDe with the TypeInfo when available.
+        valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag);
+        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+            valueTableDesc[tag].getDeserializerClass(), null);
+        inputValueDeserializer[tag].initialize(null, valueTableDesc[tag]
+            .getProperties());
+        valueObjectInspector[tag] = inputValueDeserializer[tag]
+            .getObjectInspector();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector[tag]);
+        rowObjectInspector[tag] = ObjectInspectorFactory
+            .getStandardStructObjectInspector(Utilities.reduceFieldNameList, 
ois);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    MapredContext.init(false, new JobConf(jconf));
+
+    // initialize reduce operator tree
+    try {
+      l4j.info(reducer.dump(0));
+      reducer.initialize(jconf, rowObjectInspector);
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        throw new RuntimeException("Reduce operator initialization failed", e);
+      }
+    }
+
+    reducer.setOutputCollector(out);
+    reducer.setReporter(reporter);
+    MapredContext.get().setReporter(reporter);
+
+
+  }
+
+  @Override
+  void run() throws IOException{
+    if (inputs.size() != 1) {
+      throw new IllegalArgumentException("ReduceRecordProcessor expects single 
input"
+          + ", inputCount=" + inputs.size());
+    }
+
+    //TODO - changes this for joins
+    ShuffledMergedInput in = 
(ShuffledMergedInput)inputs.values().iterator().next();
+    KeyValuesReader reader = in.getReader();
+
+    //process records until done
+    while(reader.next()){
+      Object key = reader.getCurrentKey();
+      Iterable<Object> values = reader.getCurrentValues();
+      boolean needMore = processKeyValues(key, values);
+      if(!needMore){
+        break;
+      }
+    }
+  }
+
+  /**
+   * @param key
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processKeyValues(Object key, Iterable<Object> values) {
+    if(reducer.getDone()){
+      //done - no more records needed
+      return false;
+    }
+
+    // reset the execContext for each new row
+    execContext.resetRow();
+
+    try {
+      BytesWritable keyWritable = (BytesWritable) key;
+
+      byte tag = 0;
+      if (isTagged) {
+        // remove the tag from key coming out of reducer
+        // and store it in separate variable.
+        int size = keyWritable.getSize() - 1;
+        tag = keyWritable.get()[size];
+        keyWritable.setSize(size);
+      }
+
+      //Set the key, check if this is a new group or same group
+      if (!keyWritable.equals(groupKey)) {
+        // If a operator wants to do some work at the beginning of a group
+        if (groupKey == null) { // the first group
+          groupKey = new BytesWritable();
+        } else {
+          // If a operator wants to do some work at the end of a group
+          l4j.trace("End Group");
+          reducer.endGroup();
+        }
+
+        try {
+          keyObject = inputKeyDeserializer.deserialize(keyWritable);
+        } catch (Exception e) {
+          throw new HiveException(
+              "Hive Runtime Error: Unable to deserialize reduce input key from 
"
+              + Utilities.formatBinaryString(keyWritable.get(), 0,
+              keyWritable.getSize()) + " with properties "
+              + keyTableDesc.getProperties(), e);
+        }
+
+        groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+        l4j.trace("Start Group");
+        reducer.startGroup();
+        reducer.setGroupKeyObject(keyObject);
+      }
+
+      //process all the values we have for this key
+      Iterator<Object> valuesIt = values.iterator();
+      while (valuesIt.hasNext()) {
+        BytesWritable valueWritable = (BytesWritable) valuesIt.next();
+        Object valueObj;
+        try {
+          valueObj = inputValueDeserializer[tag].deserialize(valueWritable);
+        } catch (SerDeException e) {
+          throw new HiveException(
+              "Hive Runtime Error: Unable to deserialize reduce input value 
(tag="
+              + tag
+              + ") from "
+              + Utilities.formatBinaryString(valueWritable.get(), 0,
+              valueWritable.getSize()) + " with properties "
+              + valueTableDesc[tag].getProperties(), e);
+        }
+        row.clear();
+        row.add(keyObject);
+        row.add(valueObj);
+
+        try {
+          reducer.process(row, tag);
+        } catch (Exception e) {
+          String rowString = null;
+          try {
+            rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+          } catch (Exception e2) {
+            rowString = "[Error getting row data with exception " +
+                  StringUtils.stringifyException(e2) + " ]";
+          }
+          throw new HiveException("Hive Runtime Error while processing row 
(tag="
+              + tag + ") " + rowString, e);
+        }
+        if (isLogInfoEnabled) {
+          logProgress();
+        }
+      }
+
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        l4j.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+    return true; //give me more
+  }
+
+  @Override
+  void close(){
+    // check if there are IOExceptions
+    if (!abort) {
+      abort = execContext.getIoCxt().getIOExceptions();
+    }
+    // No row was processed
+    if (out == null) {
+      l4j.trace("Close called no row");
+    }
+
+    try {
+      if (groupKey != null) {
+        // If a operator wants to do some work at the end of a group
+        l4j.trace("End Group");
+        reducer.endGroup();
+      }
+      if (isLogInfoEnabled) {
+        logCloseInfo();
+      }
+
+      reducer.close(abort);
+      reportStats rps = new reportStats(reporter);
+      reducer.preorderMap(rps);
+
+    } catch (Exception e) {
+      if (!abort) {
+        // signal new failure to map-reduce
+        l4j.error("Hit error while closing operators - failing tree");
+        throw new RuntimeException("Hive Runtime Error while closing 
operators: "
+            + e.getMessage(), e);
+      }
+    } finally {
+      MapredContext.close();
+    }
+  }
+
+}

Added: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1529076&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
 (added)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * Subclass that is used to indicate if this is a map or reduce process
+ */
+public class ReduceTezProcessor extends TezProcessor {
+  public ReduceTezProcessor(){
+    super(false);
+  }
+}

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
 Fri Oct  4 04:02:12 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,7 +26,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -42,16 +40,16 @@ import org.apache.tez.runtime.library.ap
  */
 public class TezProcessor implements LogicalIOProcessor {
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
+  private boolean isMap = false;
 
-  boolean isMap;
   RecordProcessor rproc = null;
 
   private JobConf jobConf;
 
   private TezProcessorContext processorContext;
 
-  public TezProcessor() {
-    this.isMap = true;
+  public TezProcessor(boolean isMap) {
+    this.isMap = isMap;
   }
 
   @Override
@@ -97,15 +95,7 @@ public class TezProcessor implements Log
     LogicalInput in = inputs.values().iterator().next();
     LogicalOutput out = outputs.values().iterator().next();
 
-    MRInput input = (MRInput)in;
 
-    //update config
-    Configuration updatedConf = input.getConfigUpdates();
-    if (updatedConf != null) {
-      for (Entry<String, String> entry : updatedConf) {
-        this.jobConf.set(entry.getKey(), entry.getValue());
-      }
-    }
 
     KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter();
     OutputCollector collector = new KVOutputCollector(kvWriter);
@@ -114,11 +104,11 @@ public class TezProcessor implements Log
       rproc = new MapRecordProcessor();
     }
     else{
-      throw new UnsupportedOperationException("Reduce is yet to be 
implemented");
+      rproc = new ReduceRecordProcessor();
     }
 
     MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-    rproc.init(jobConf, mrReporter, inputs.values(), collector);
+    rproc.init(jobConf, mrReporter, inputs, collector);
     rproc.run();
 
     //done - output does not need to be committed as hive does not use 
outputcommitter


Reply via email to