Author: gunther Date: Fri Oct 4 04:02:12 2013 New Revision: 1529076 URL: http://svn.apache.org/r1529076 Log: HIVE-5390: custom LogicalIOProcessor - reduce record processor (Thejas Nair via Gunther Hagleitner)
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1529076&r1=1529075&r2=1529076&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Oct 4 04:02:12 2013 @@ -78,8 +78,6 @@ import org.apache.tez.mapreduce.hadoop.M import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; -import org.apache.tez.mapreduce.processor.map.MapProcessor; -import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.apache.tez.mapreduce.partition.MRPartitioner; /** @@ -233,7 +231,7 @@ public class DagUtils { byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); if (inputSplitInfo.getNumTasks() != 0) { map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(TezProcessor.class.getName()). + new ProcessorDescriptor(MapTezProcessor.class.getName()). setUserPayload(serializedConf), inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); Map<String, String> environment = new HashMap<String, String>(); @@ -313,7 +311,7 @@ public class DagUtils { // create the vertex Vertex reducer = new Vertex(reduceWork.getName(), - new ProcessorDescriptor(ReduceProcessor.class.getName()). + new ProcessorDescriptor(ReduceTezProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf)); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Oct 4 04:02:12 2013 @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; -import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -45,26 +47,36 @@ import org.apache.tez.runtime.library.ap */ public class MapRecordProcessor extends RecordProcessor{ - private static final String PLAN_KEY = "__MAP_PLAN__"; + private MapOperator mapOp; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; + protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; @Override - void init(JobConf jconf, MRTaskReporter mrReporter, Collection<LogicalInput> inputs, + void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, OutputCollector out){ super.init(jconf, mrReporter, inputs, out); + //Update JobConf using MRInput, info like filename comes via this + MRInput mrInput = getMRInput(inputs); + Configuration updatedConf = mrInput.getConfigUpdates(); + if (updatedConf != null) { + for (Entry<String, String> entry : updatedConf) { + jconf.set(entry.getKey(), entry.getValue()); + } + } + ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { execContext.setJc(jconf); // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); + MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mrwork == null) { mrwork = Utilities.getMapWork(jconf); - cache.cache(PLAN_KEY, mrwork); + cache.cache(MAP_PLAN_KEY, mrwork); } mapOp = new MapOperator(); @@ -94,6 +106,21 @@ public class MapRecordProcessor extends } } + private MRInput getMRInput(Map<String, LogicalInput> inputs) { + //there should be only one MRInput + MRInput theMRInput = null; + for(LogicalInput inp : inputs.values()){ + if(inp instanceof MRInput){ + if(theMRInput != null){ + throw new IllegalArgumentException("Only one MRInput is expected"); + } + //a better logic would be to find the alias + theMRInput = (MRInput)inp; + } + } + return theMRInput; + } + @Override void run() throws IOException{ if (inputs.size() != 1) { @@ -101,7 +128,7 @@ public class MapRecordProcessor extends + ", inputCount=" + inputs.size()); } - MRInput in = (MRInput)inputs.iterator().next(); + MRInput in = getMRInput(inputs); KeyValueReader reader = in.getReader(); //process records until done Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1529076&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Fri Oct 4 04:02:12 2013 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +/** + * Subclass that is used to indicate if this is a map or reduce process + */ +public class MapTezProcessor extends TezProcessor { + public MapTezProcessor(){ + super(true); + } +} Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Fri Oct 4 04:02:12 2013 @@ -21,7 +21,7 @@ import java.lang.management.ManagementFa import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; -import java.util.Collection; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,7 +37,7 @@ import org.apache.tez.runtime.api.Logica public abstract class RecordProcessor { protected JobConf jconf; - protected Collection<LogicalInput> inputs; + protected Map<String, LogicalInput> inputs; protected OutputCollector out; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -59,7 +59,7 @@ public abstract class RecordProcessor { * @param inputs * @param out */ - void init(JobConf jconf, MRTaskReporter mrReporter, Collection<LogicalInput> inputs, + void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, OutputCollector out){ this.jconf = jconf; this.reporter = mrReporter; Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1529076&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Fri Oct 4 04:02:12 2013 @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.input.ShuffledMergedInput; + +/** + * Process input from tez LogicalInput and write output - for a map plan + * Just pump the records through the query plan. + */ +public class ReduceRecordProcessor extends RecordProcessor{ + private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private final ExecMapperContext execContext = new ExecMapperContext(); + private boolean abort = false; + private Deserializer inputKeyDeserializer; + + // Input value serde needs to be an array to support different SerDe + // for different tags + private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; + + TableDesc keyTableDesc; + TableDesc[] valueTableDesc; + + ObjectInspector[] rowObjectInspector; + private Operator<?> reducer; + private boolean isTagged = false; + + private Object keyObject = null; + private BytesWritable groupKey; + + List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); + + @Override + void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, + OutputCollector out){ + super.init(jconf, mrReporter, inputs, out); + + ObjectCache cache = ObjectCacheFactory.getCache(jconf); + + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector keyObjectInspector; + + ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); + if (redWork == null) { + redWork = Utilities.getReduceWork(jconf); + cache.cache(REDUCE_PLAN_KEY, redWork); + } + + reducer = redWork.getReducer(); + reducer.setParentOperators(null); // clear out any parents as reducer is the + // root + isTagged = redWork.getNeedsTagging(); + try { + keyTableDesc = redWork.getKeyDesc(); + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + keyObjectInspector = inputKeyDeserializer.getObjectInspector(); + valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + // We should initialize the SerDe with the TypeInfo when available. + valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); + inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( + valueTableDesc[tag].getDeserializerClass(), null); + inputValueDeserializer[tag].initialize(null, valueTableDesc[tag] + .getProperties()); + valueObjectInspector[tag] = inputValueDeserializer[tag] + .getObjectInspector(); + + ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); + ois.add(keyObjectInspector); + ois.add(valueObjectInspector[tag]); + rowObjectInspector[tag] = ObjectInspectorFactory + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + MapredContext.init(false, new JobConf(jconf)); + + // initialize reduce operator tree + try { + l4j.info(reducer.dump(0)); + reducer.initialize(jconf, rowObjectInspector); + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Reduce operator initialization failed", e); + } + } + + reducer.setOutputCollector(out); + reducer.setReporter(reporter); + MapredContext.get().setReporter(reporter); + + + } + + @Override + void run() throws IOException{ + if (inputs.size() != 1) { + throw new IllegalArgumentException("ReduceRecordProcessor expects single input" + + ", inputCount=" + inputs.size()); + } + + //TODO - changes this for joins + ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next(); + KeyValuesReader reader = in.getReader(); + + //process records until done + while(reader.next()){ + Object key = reader.getCurrentKey(); + Iterable<Object> values = reader.getCurrentValues(); + boolean needMore = processKeyValues(key, values); + if(!needMore){ + break; + } + } + } + + /** + * @param key + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processKeyValues(Object key, Iterable<Object> values) { + if(reducer.getDone()){ + //done - no more records needed + return false; + } + + // reset the execContext for each new row + execContext.resetRow(); + + try { + BytesWritable keyWritable = (BytesWritable) key; + + byte tag = 0; + if (isTagged) { + // remove the tag from key coming out of reducer + // and store it in separate variable. + int size = keyWritable.getSize() - 1; + tag = keyWritable.get()[size]; + keyWritable.setSize(size); + } + + //Set the key, check if this is a new group or same group + if (!keyWritable.equals(groupKey)) { + // If a operator wants to do some work at the beginning of a group + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // If a operator wants to do some work at the end of a group + l4j.trace("End Group"); + reducer.endGroup(); + } + + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + l4j.trace("Start Group"); + reducer.startGroup(); + reducer.setGroupKeyObject(keyObject); + } + + //process all the values we have for this key + Iterator<Object> valuesIt = values.iterator(); + while (valuesIt.hasNext()) { + BytesWritable valueWritable = (BytesWritable) valuesIt.next(); + Object valueObj; + try { + valueObj = inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.get(), 0, + valueWritable.getSize()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + row.clear(); + row.add(keyObject); + row.add(valueObj); + + try { + reducer.process(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + if (isLogInfoEnabled) { + logProgress(); + } + } + + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + return true; //give me more + } + + @Override + void close(){ + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + // No row was processed + if (out == null) { + l4j.trace("Close called no row"); + } + + try { + if (groupKey != null) { + // If a operator wants to do some work at the end of a group + l4j.trace("End Group"); + reducer.endGroup(); + } + if (isLogInfoEnabled) { + logCloseInfo(); + } + + reducer.close(abort); + reportStats rps = new reportStats(reporter); + reducer.preorderMap(rps); + + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + l4j.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + + e.getMessage(), e); + } + } finally { + MapredContext.close(); + } + } + +} Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1529076&view=auto ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (added) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Fri Oct 4 04:02:12 2013 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +/** + * Subclass that is used to indicate if this is a map or reduce process + */ +public class ReduceTezProcessor extends TezProcessor { + public ReduceTezProcessor(){ + super(false); + } +} Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1529076&r1=1529075&r2=1529076&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Oct 4 04:02:12 2013 @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.t import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,7 +26,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.common.TezUtils; -import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; @@ -42,16 +40,16 @@ import org.apache.tez.runtime.library.ap */ public class TezProcessor implements LogicalIOProcessor { private static final Log LOG = LogFactory.getLog(TezProcessor.class); + private boolean isMap = false; - boolean isMap; RecordProcessor rproc = null; private JobConf jobConf; private TezProcessorContext processorContext; - public TezProcessor() { - this.isMap = true; + public TezProcessor(boolean isMap) { + this.isMap = isMap; } @Override @@ -97,15 +95,7 @@ public class TezProcessor implements Log LogicalInput in = inputs.values().iterator().next(); LogicalOutput out = outputs.values().iterator().next(); - MRInput input = (MRInput)in; - //update config - Configuration updatedConf = input.getConfigUpdates(); - if (updatedConf != null) { - for (Entry<String, String> entry : updatedConf) { - this.jobConf.set(entry.getKey(), entry.getValue()); - } - } KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter(); OutputCollector collector = new KVOutputCollector(kvWriter); @@ -114,11 +104,11 @@ public class TezProcessor implements Log rproc = new MapRecordProcessor(); } else{ - throw new UnsupportedOperationException("Reduce is yet to be implemented"); + rproc = new ReduceRecordProcessor(); } MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, mrReporter, inputs.values(), collector); + rproc.init(jobConf, mrReporter, inputs, collector); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter