pgaref commented on a change in pull request #2364:
URL: https://github.com/apache/hive/pull/2364#discussion_r648392171
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -1824,6 +1824,12 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
HIVEALIAS("hive.alias", "", ""),
HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side
aggregation in Hive Group By queries"),
HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew
in data to optimize group by queries"),
+
+ HIVE_ENABLE_COMBINER_FOR_GROUP_BY("hive.enable.combiner.for.groupby", true,
Review comment:
If its map specific maybe rename to
HIVE_ENABLE_COMBINER_FOR_MAP_GROUP_BY (?)
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -1824,6 +1824,12 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
HIVEALIAS("hive.alias", "", ""),
HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side
aggregation in Hive Group By queries"),
HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew
in data to optimize group by queries"),
+
+ HIVE_ENABLE_COMBINER_FOR_GROUP_BY("hive.enable.combiner.for.groupby", true,
+ "Whether to enable tez combiner to aggregate the records after sorting
is done. It is supported "
+ + "only for map side aggregation. This will be useful where the map
side hash aggregation is converted to "
Review comment:
nit. Mostly useful when map side hash aggregation switches to streaming
mode due to high memory utilization etc.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+ DataInputBuffer prevKey;
+ BytesWritable valWritable;
+ DataInputBuffer prevVal;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ valueSerializer.initialize(null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ prevKey = new DataInputBuffer();
+ valWritable = new BytesWritable();
+ prevVal = new DataInputBuffer();
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation()
+ throws HiveException, SerDeException {
+ valWritable.set(prevVal.getData(), prevVal.getPosition(),
+ prevVal.getLength() - prevVal.getPosition());
+ Object row = valueSerializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ int numSameKey = 0;
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!VectorGroupByCombiner.equals(key, prevKey)) {
+ // if current key is not equal to the previous key then we have to
emit the
+ // record. In case only one record was present for this key, then no
need to
+ // do aggregation, We can directly append the key and value. For key
with more
+ // than one record, we have to update the aggregation for the
current value only
+ // as for previous values (records) aggregation is already done in
previous
+ // iteration of loop.
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numSameKey = 0;
+ } else {
+ // If there are more than one record with same key then update the
aggregation.
+ updateAggregation();
+ numSameKey++;
+ }
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ numRows++;
+ }
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch(Exception e) {
+ LOG.error("processRows failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+ try {
+ if (!rawIter.next()) {
+ return;
+ }
+ if (numValueCol == 0) {
+ // For no aggregation, RLE in writer will take care of reduction.
+ appendDirectlyToWriter(rawIter, writer);
+ } else {
+ processRows(rawIter, writer);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to combine rows", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ public static JobConf setCombinerInConf(BaseWork dest, JobConf conf, JobConf
destConf) {
+ if (conf == null || !HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_ENABLE_COMBINER_FOR_GROUP_BY)) {
+ return conf;
+ }
+
+ // As of now, this function is called for all edges. We are interested
only on edges
+ // to reducer.
+ if (!(dest instanceof ReduceWork)) {
Review comment:
nit. add this to the above condition
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does" +
+ " not match with number of aggregators");
+ rw = null;
+ numValueCol = 0;
+ return;
+ }
+ }
+
+ try {
+ initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+ if (batchContext != null && numValueCol > 0) {
+ initVectorBatches();
+ }
+ } catch (SerDeException e) {
+ LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ protected static String getConfigPrefix(String destName) {
+ return confPrefixForWorker + destName;
+ }
+
+ // Get the reduce work from the config. Here some hack is used to prefix the
config name with
+ // "file." to avoid the config being filtered out.
+ private ReduceWork getReduceWork(TaskContext context) {
+ String destVertexName;
+ if (context instanceof TezOutputContextImpl) {
+ destVertexName =
((TezOutputContextImpl)context).getDestinationVertexName();
+ } else {
+ // As of now only map side combiner is supported.
+ return null;
+ }
+
+ String plan = conf.get(getConfigPrefix(destVertexName) +
+ HiveConf.ConfVars.PLAN.varname);
+ if (plan == null) {
+ LOG.info("Reduce plan is not set for vertex " + destVertexName);
+ return null;
+ }
+ this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+ if (conf.getBoolean(getConfigPrefix(destVertexName)
+ + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+ true)) {
+ Path planPath = new Path(plan);
+ planPath = new Path(planPath, REDUCE_PLAN_NAME);
+ String planString = conf.get(getConfigPrefix(destVertexName) +
+ planPath.toUri().getPath());
+ this.conf.set(planPath.toUri().getPath(), planString);
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+ } else {
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+ }
+ this.conf.set(HAS_REDUCE_WORK, "true");
+ this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+ return Utilities.getReduceWork(conf);
+ }
+
+ private void initObjectInspectors(TableDesc valueTableDesc,TableDesc
keyTableDesc)
+ throws SerDeException {
+ inputKeyDeserializer =
+ ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+ AbstractSerDe inputValueDeserializer = (AbstractSerDe)
ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc.getProperties(),
null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+ numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+ }
+
+ void initVectorBatches() throws HiveException {
+ inputBatch = batchContext.createVectorizedRowBatch();
+
+ // Create data buffers for value bytes column vectors.
+ for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+ ColumnVector colVector = inputBatch.cols[i];
+ if (colVector instanceof BytesColumnVector) {
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+ bytesColumnVector.initBuffer();
+ }
+ }
+
+ batchValueDeserializer =
+ new VectorDeserializeRow<>(
+ new LazyBinaryDeserializeRead(
+
VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+ valueStructInspectors),
+ true));
+ batchValueDeserializer.init(firstValueColumnOffset);
+
+ int[] valueColumnMap = new int[numValueCol];
+ for (int i = 0; i < numValueCol; i++) {
+ valueColumnMap[i] = i + firstValueColumnOffset;
+ }
+
+ valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+ valueVectorSerializeRow = new
VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+
valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+ valueStructInspectors), valueColumnMap);
+ valueOutput = new ByteStream.Output();
+ valueVectorSerializeRow.setOutput(valueOutput);
+ outputBatch = batchContext.createVectorizedRowBatch();
+ valueBytesWritable = new DataInputBuffer();
+ }
+
+ private VectorAggregationBufferRow allocateAggregationBuffer() throws
HiveException {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ new
VectorAggregateExpression.AggregationBuffer[aggregators.length];
+ for (int i=0; i < aggregators.length; ++i) {
+ aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+ aggregators[i].reset(aggregationBuffers[i]);
+ }
+ return new VectorAggregationBufferRow(aggregationBuffers);
+ }
+
+ private void finishAggregation(DataInputBuffer key, IFile.Writer writer,
boolean needFlush)
+ throws HiveException, IOException {
+ for (int i = 0; i < aggregators.length; ++i) {
+ try {
+
aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i),
inputBatch);
+ } catch (HiveException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ // In case the input batch is full but the keys are still same we need not
flush.
+ // Only evaluate the aggregates and store it in the aggregationBufferRow.
The aggregate
+ // functions are incremental and will take care of correctness when next
batch comes for
+ // aggregation.
+ if (!needFlush) {
+ return;
+ }
+
+ int colNum = firstValueColumnOffset;
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+ aggregationBufferRow.getAggregationBuffer(i));
+ }
+
+ valueLazyBinarySerializeWrite.reset();
+ valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+ valueBytesWritable.reset(valueOutput.getData(), 0,
valueOutput.getLength());
+ writer.append(key, valueBytesWritable);
+ combineOutputRecordsCounter.increment(1);
+ aggregationBufferRow.reset();
+ outputBatch.reset();
+ }
+
+ private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+ IFile.Writer writer, boolean needFLush) throws
IOException, HiveException {
+ batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+ val.getLength() - val.getPosition());
+ batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+ inputBatch.size++;
+ if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+ processVectorGroup(key, writer, needFLush);
+ }
+ }
+
+ private void processVectorGroup(DataInputBuffer key, IFile.Writer writer,
boolean needFlush)
+ throws HiveException {
+ try {
+ finishAggregation(key, writer, needFlush);
+ inputBatch.reset();
+ } catch (Exception e) {
+ String rowString;
+ try {
+ rowString = inputBatch.toString();
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception "
+ + StringUtils.stringifyException(e2) + " ]";
+ }
+ LOG.error("Hive Runtime Error while processing vector batch" +
rowString, e);
+ throw new HiveException("Hive Runtime Error while processing vector
batch", e);
+ }
+ }
+
+ protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter,
IFile.Writer writer) {
+ long numRows = 0;
+ try {
+ do {
+ numRows++;
+ writer.append(rawIter.getKey(), rawIter.getValue());
+ } while (rawIter.next());
+ combineInputRecordsCounter.increment(numRows);
+ combineOutputRecordsCounter.increment(numRows);
+ } catch(IOException e) {
+ LOG.error("Append to writer failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ private void appendToWriter(DataInputBuffer val, DataInputBuffer key,
IFile.Writer writer) {
+ try {
+ writer.append(key, val);
+ combineOutputRecordsCounter.increment(1);
+ } catch(IOException e) {
+ LOG.error("Append value list to writer failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ public static boolean equals(DataInputBuffer buf1, DataInputBuffer buf2) {
+ byte[] b1 = buf1.getData();
+ int s1 = buf1.getPosition();
+ int l1 = buf1.getLength() - buf1.getPosition();
+ byte[] b2 = buf2.getData();
+ int s2 = buf2.getPosition();
+ int l2 = buf2.getLength() - buf2.getPosition();
+ return org.apache.hadoop.hive.ql.exec.util.FastByteComparisons.equals(b1,
s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+ try {
+ if (!rawIter.next()) {
+ return;
+ }
+
+ if (numValueCol == 0) {
+ // For no aggregation, RLE in writer will take care of reduction.
+ //TODO this can be optimized further as RLE will still write length of
value.
+ appendDirectlyToWriter(rawIter, writer);
+ return;
+ }
+
+ DataInputBuffer key = rawIter.getKey();
+ DataInputBuffer prevKey = new DataInputBuffer();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ DataInputBuffer valTemp = rawIter.getValue();
+ DataInputBuffer val = new DataInputBuffer();
+ val.reset(valTemp.getData(), valTemp.getPosition(),
+ valTemp.getLength() - valTemp.getPosition());
+
+ int numValues = 1;
+ long numRows = 1;
+
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!equals(prevKey, key)) {
+ if (numValues == 1) {
+ // if key has single record, no need for aggregation.
+ appendToWriter(val, prevKey, writer);
+ } else {
+ addValueToBatch(val, prevKey, writer, true);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numValues = 0;
+ } else {
+ addValueToBatch(val, prevKey, writer, false);
+ }
+ valTemp = rawIter.getValue();
+ val.reset(valTemp.getData(), valTemp.getPosition(),
+ valTemp.getLength() - valTemp.getPosition());
+ numRows++;
+ numValues++;
+ }
+
+ // Process the last key.
+ if (numValues == 1) {
+ appendToWriter(val, prevKey, writer);
+ } else {
+ addValueToBatch(val, prevKey, writer, true);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch (IOException | HiveException e) {
+ LOG.error("Failed to combine rows", e);
Review comment:
log or throw (not both)
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does" +
+ " not match with number of aggregators");
+ rw = null;
+ numValueCol = 0;
+ return;
+ }
+ }
+
+ try {
+ initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+ if (batchContext != null && numValueCol > 0) {
+ initVectorBatches();
+ }
+ } catch (SerDeException e) {
+ LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ protected static String getConfigPrefix(String destName) {
+ return confPrefixForWorker + destName;
+ }
+
+ // Get the reduce work from the config. Here some hack is used to prefix the
config name with
+ // "file." to avoid the config being filtered out.
+ private ReduceWork getReduceWork(TaskContext context) {
+ String destVertexName;
+ if (context instanceof TezOutputContextImpl) {
+ destVertexName =
((TezOutputContextImpl)context).getDestinationVertexName();
+ } else {
+ // As of now only map side combiner is supported.
+ return null;
+ }
+
+ String plan = conf.get(getConfigPrefix(destVertexName) +
+ HiveConf.ConfVars.PLAN.varname);
+ if (plan == null) {
+ LOG.info("Reduce plan is not set for vertex " + destVertexName);
+ return null;
+ }
+ this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+ if (conf.getBoolean(getConfigPrefix(destVertexName)
+ + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+ true)) {
+ Path planPath = new Path(plan);
+ planPath = new Path(planPath, REDUCE_PLAN_NAME);
+ String planString = conf.get(getConfigPrefix(destVertexName) +
+ planPath.toUri().getPath());
+ this.conf.set(planPath.toUri().getPath(), planString);
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+ } else {
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+ }
+ this.conf.set(HAS_REDUCE_WORK, "true");
+ this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+ return Utilities.getReduceWork(conf);
+ }
+
+ private void initObjectInspectors(TableDesc valueTableDesc,TableDesc
keyTableDesc)
+ throws SerDeException {
+ inputKeyDeserializer =
+ ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+ AbstractSerDe inputValueDeserializer = (AbstractSerDe)
ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc.getProperties(),
null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+ numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+ }
+
+ void initVectorBatches() throws HiveException {
+ inputBatch = batchContext.createVectorizedRowBatch();
+
+ // Create data buffers for value bytes column vectors.
+ for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+ ColumnVector colVector = inputBatch.cols[i];
+ if (colVector instanceof BytesColumnVector) {
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+ bytesColumnVector.initBuffer();
Review comment:
Actually: createVectorizedRowBatch() already calls init as the end
(throught reset) any particular reason we doing this again here?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does" +
+ " not match with number of aggregators");
+ rw = null;
+ numValueCol = 0;
+ return;
+ }
+ }
+
+ try {
+ initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+ if (batchContext != null && numValueCol > 0) {
+ initVectorBatches();
+ }
+ } catch (SerDeException e) {
+ LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ protected static String getConfigPrefix(String destName) {
+ return confPrefixForWorker + destName;
+ }
+
+ // Get the reduce work from the config. Here some hack is used to prefix the
config name with
+ // "file." to avoid the config being filtered out.
+ private ReduceWork getReduceWork(TaskContext context) {
+ String destVertexName;
+ if (context instanceof TezOutputContextImpl) {
+ destVertexName =
((TezOutputContextImpl)context).getDestinationVertexName();
+ } else {
+ // As of now only map side combiner is supported.
+ return null;
+ }
+
+ String plan = conf.get(getConfigPrefix(destVertexName) +
+ HiveConf.ConfVars.PLAN.varname);
+ if (plan == null) {
+ LOG.info("Reduce plan is not set for vertex " + destVertexName);
+ return null;
+ }
+ this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+ if (conf.getBoolean(getConfigPrefix(destVertexName)
+ + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+ true)) {
+ Path planPath = new Path(plan);
+ planPath = new Path(planPath, REDUCE_PLAN_NAME);
+ String planString = conf.get(getConfigPrefix(destVertexName) +
+ planPath.toUri().getPath());
+ this.conf.set(planPath.toUri().getPath(), planString);
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+ } else {
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+ }
+ this.conf.set(HAS_REDUCE_WORK, "true");
+ this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+ return Utilities.getReduceWork(conf);
+ }
+
+ private void initObjectInspectors(TableDesc valueTableDesc,TableDesc
keyTableDesc)
+ throws SerDeException {
+ inputKeyDeserializer =
+ ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+ AbstractSerDe inputValueDeserializer = (AbstractSerDe)
ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc.getProperties(),
null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+ numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+ }
+
+ void initVectorBatches() throws HiveException {
+ inputBatch = batchContext.createVectorizedRowBatch();
+
+ // Create data buffers for value bytes column vectors.
+ for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+ ColumnVector colVector = inputBatch.cols[i];
+ if (colVector instanceof BytesColumnVector) {
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+ bytesColumnVector.initBuffer();
+ }
+ }
+
+ batchValueDeserializer =
Review comment:
split this: to deserializeRead and vectorDeserializeRow
also:
VectorizedBatchUtil.typeInfosFromStructObjectInspector(valueStructInspectors)
could be reused below
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does" +
+ " not match with number of aggregators");
+ rw = null;
+ numValueCol = 0;
+ return;
+ }
+ }
+
+ try {
+ initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+ if (batchContext != null && numValueCol > 0) {
+ initVectorBatches();
+ }
+ } catch (SerDeException e) {
+ LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ protected static String getConfigPrefix(String destName) {
+ return confPrefixForWorker + destName;
+ }
+
+ // Get the reduce work from the config. Here some hack is used to prefix the
config name with
+ // "file." to avoid the config being filtered out.
+ private ReduceWork getReduceWork(TaskContext context) {
+ String destVertexName;
+ if (context instanceof TezOutputContextImpl) {
+ destVertexName =
((TezOutputContextImpl)context).getDestinationVertexName();
+ } else {
+ // As of now only map side combiner is supported.
+ return null;
+ }
+
+ String plan = conf.get(getConfigPrefix(destVertexName) +
+ HiveConf.ConfVars.PLAN.varname);
+ if (plan == null) {
+ LOG.info("Reduce plan is not set for vertex " + destVertexName);
+ return null;
+ }
+ this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+ if (conf.getBoolean(getConfigPrefix(destVertexName)
+ + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+ true)) {
+ Path planPath = new Path(plan);
+ planPath = new Path(planPath, REDUCE_PLAN_NAME);
+ String planString = conf.get(getConfigPrefix(destVertexName) +
+ planPath.toUri().getPath());
+ this.conf.set(planPath.toUri().getPath(), planString);
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+ } else {
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+ }
+ this.conf.set(HAS_REDUCE_WORK, "true");
+ this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+ return Utilities.getReduceWork(conf);
+ }
+
+ private void initObjectInspectors(TableDesc valueTableDesc,TableDesc
keyTableDesc)
+ throws SerDeException {
+ inputKeyDeserializer =
+ ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+ AbstractSerDe inputValueDeserializer = (AbstractSerDe)
ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc.getProperties(),
null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+ numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+ }
+
+ void initVectorBatches() throws HiveException {
+ inputBatch = batchContext.createVectorizedRowBatch();
+
+ // Create data buffers for value bytes column vectors.
+ for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+ ColumnVector colVector = inputBatch.cols[i];
+ if (colVector instanceof BytesColumnVector) {
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+ bytesColumnVector.initBuffer();
+ }
+ }
+
+ batchValueDeserializer =
+ new VectorDeserializeRow<>(
+ new LazyBinaryDeserializeRead(
+
VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+ valueStructInspectors),
+ true));
+ batchValueDeserializer.init(firstValueColumnOffset);
+
+ int[] valueColumnMap = new int[numValueCol];
+ for (int i = 0; i < numValueCol; i++) {
+ valueColumnMap[i] = i + firstValueColumnOffset;
+ }
+
+ valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+ valueVectorSerializeRow = new
VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+
valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+ valueStructInspectors), valueColumnMap);
+ valueOutput = new ByteStream.Output();
+ valueVectorSerializeRow.setOutput(valueOutput);
+ outputBatch = batchContext.createVectorizedRowBatch();
+ valueBytesWritable = new DataInputBuffer();
+ }
+
+ private VectorAggregationBufferRow allocateAggregationBuffer() throws
HiveException {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ new
VectorAggregateExpression.AggregationBuffer[aggregators.length];
+ for (int i=0; i < aggregators.length; ++i) {
+ aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+ aggregators[i].reset(aggregationBuffers[i]);
+ }
+ return new VectorAggregationBufferRow(aggregationBuffers);
+ }
+
+ private void finishAggregation(DataInputBuffer key, IFile.Writer writer,
boolean needFlush)
+ throws HiveException, IOException {
+ for (int i = 0; i < aggregators.length; ++i) {
+ try {
+
aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i),
inputBatch);
+ } catch (HiveException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ // In case the input batch is full but the keys are still same we need not
flush.
+ // Only evaluate the aggregates and store it in the aggregationBufferRow.
The aggregate
+ // functions are incremental and will take care of correctness when next
batch comes for
+ // aggregation.
+ if (!needFlush) {
+ return;
+ }
+
+ int colNum = firstValueColumnOffset;
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+ aggregationBufferRow.getAggregationBuffer(i));
+ }
+
+ valueLazyBinarySerializeWrite.reset();
+ valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+ valueBytesWritable.reset(valueOutput.getData(), 0,
valueOutput.getLength());
+ writer.append(key, valueBytesWritable);
+ combineOutputRecordsCounter.increment(1);
+ aggregationBufferRow.reset();
+ outputBatch.reset();
+ }
+
+ private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+ IFile.Writer writer, boolean needFLush) throws
IOException, HiveException {
+ batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+ val.getLength() - val.getPosition());
+ batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+ inputBatch.size++;
+ if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+ processVectorGroup(key, writer, needFLush);
+ }
+ }
+
+ private void processVectorGroup(DataInputBuffer key, IFile.Writer writer,
boolean needFlush)
+ throws HiveException {
+ try {
+ finishAggregation(key, writer, needFlush);
+ inputBatch.reset();
+ } catch (Exception e) {
+ String rowString;
+ try {
+ rowString = inputBatch.toString();
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception "
+ + StringUtils.stringifyException(e2) + " ]";
+ }
+ LOG.error("Hive Runtime Error while processing vector batch" +
rowString, e);
+ throw new HiveException("Hive Runtime Error while processing vector
batch", e);
+ }
+ }
+
+ protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter,
IFile.Writer writer) {
+ long numRows = 0;
+ try {
+ do {
+ numRows++;
+ writer.append(rawIter.getKey(), rawIter.getValue());
+ } while (rawIter.next());
+ combineInputRecordsCounter.increment(numRows);
+ combineOutputRecordsCounter.increment(numRows);
+ } catch(IOException e) {
+ LOG.error("Append to writer failed", e);
Review comment:
Log or throw
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
Review comment:
getName() not needed -- and
org.apache.hadoop.hive.ql.exec.GroupByCombiner -> GroupByCombiner
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+ DataInputBuffer prevKey;
+ BytesWritable valWritable;
+ DataInputBuffer prevVal;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ valueSerializer.initialize(null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ prevKey = new DataInputBuffer();
+ valWritable = new BytesWritable();
+ prevVal = new DataInputBuffer();
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation()
+ throws HiveException, SerDeException {
+ valWritable.set(prevVal.getData(), prevVal.getPosition(),
+ prevVal.getLength() - prevVal.getPosition());
+ Object row = valueSerializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ int numSameKey = 0;
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!VectorGroupByCombiner.equals(key, prevKey)) {
+ // if current key is not equal to the previous key then we have to
emit the
+ // record. In case only one record was present for this key, then no
need to
+ // do aggregation, We can directly append the key and value. For key
with more
+ // than one record, we have to update the aggregation for the
current value only
+ // as for previous values (records) aggregation is already done in
previous
+ // iteration of loop.
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numSameKey = 0;
+ } else {
+ // If there are more than one record with same key then update the
aggregation.
+ updateAggregation();
+ numSameKey++;
+ }
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ numRows++;
+ }
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch(Exception e) {
+ LOG.error("processRows failed", e);
Review comment:
log or throw (not both)
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
Review comment:
make fields private and and final where possible
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
Review comment:
as above -- getName not needed
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does" +
+ " not match with number of aggregators");
+ rw = null;
+ numValueCol = 0;
+ return;
+ }
+ }
+
+ try {
+ initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+ if (batchContext != null && numValueCol > 0) {
+ initVectorBatches();
+ }
+ } catch (SerDeException e) {
+ LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ protected static String getConfigPrefix(String destName) {
+ return confPrefixForWorker + destName;
+ }
+
+ // Get the reduce work from the config. Here some hack is used to prefix the
config name with
+ // "file." to avoid the config being filtered out.
+ private ReduceWork getReduceWork(TaskContext context) {
+ String destVertexName;
+ if (context instanceof TezOutputContextImpl) {
+ destVertexName =
((TezOutputContextImpl)context).getDestinationVertexName();
+ } else {
+ // As of now only map side combiner is supported.
+ return null;
+ }
+
+ String plan = conf.get(getConfigPrefix(destVertexName) +
+ HiveConf.ConfVars.PLAN.varname);
+ if (plan == null) {
+ LOG.info("Reduce plan is not set for vertex " + destVertexName);
+ return null;
+ }
+ this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+ if (conf.getBoolean(getConfigPrefix(destVertexName)
+ + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+ true)) {
+ Path planPath = new Path(plan);
+ planPath = new Path(planPath, REDUCE_PLAN_NAME);
+ String planString = conf.get(getConfigPrefix(destVertexName) +
+ planPath.toUri().getPath());
+ this.conf.set(planPath.toUri().getPath(), planString);
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+ } else {
+ this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+ }
+ this.conf.set(HAS_REDUCE_WORK, "true");
+ this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+ return Utilities.getReduceWork(conf);
+ }
+
+ private void initObjectInspectors(TableDesc valueTableDesc,TableDesc
keyTableDesc)
+ throws SerDeException {
+ inputKeyDeserializer =
+ ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+ AbstractSerDe inputValueDeserializer = (AbstractSerDe)
ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc.getProperties(),
null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+ numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+ }
+
+ void initVectorBatches() throws HiveException {
+ inputBatch = batchContext.createVectorizedRowBatch();
+
+ // Create data buffers for value bytes column vectors.
+ for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+ ColumnVector colVector = inputBatch.cols[i];
+ if (colVector instanceof BytesColumnVector) {
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+ bytesColumnVector.initBuffer();
Review comment:
Believe init() is safer as other CV types might need some kind of
initialisation too
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+ DataInputBuffer prevKey;
+ BytesWritable valWritable;
+ DataInputBuffer prevVal;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ valueSerializer.initialize(null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ prevKey = new DataInputBuffer();
+ valWritable = new BytesWritable();
+ prevVal = new DataInputBuffer();
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation()
+ throws HiveException, SerDeException {
+ valWritable.set(prevVal.getData(), prevVal.getPosition(),
+ prevVal.getLength() - prevVal.getPosition());
+ Object row = valueSerializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ int numSameKey = 0;
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!VectorGroupByCombiner.equals(key, prevKey)) {
+ // if current key is not equal to the previous key then we have to
emit the
+ // record. In case only one record was present for this key, then no
need to
+ // do aggregation, We can directly append the key and value. For key
with more
+ // than one record, we have to update the aggregation for the
current value only
+ // as for previous values (records) aggregation is already done in
previous
+ // iteration of loop.
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numSameKey = 0;
+ } else {
+ // If there are more than one record with same key then update the
aggregation.
+ updateAggregation();
+ numSameKey++;
+ }
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ numRows++;
+ }
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch(Exception e) {
+ LOG.error("processRows failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+ try {
+ if (!rawIter.next()) {
+ return;
+ }
+ if (numValueCol == 0) {
+ // For no aggregation, RLE in writer will take care of reduction.
+ appendDirectlyToWriter(rawIter, writer);
+ } else {
+ processRows(rawIter, writer);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to combine rows", e);
Review comment:
log or throw (not both)
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+ DataInputBuffer prevKey;
+ BytesWritable valWritable;
+ DataInputBuffer prevVal;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ valueSerializer.initialize(null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ prevKey = new DataInputBuffer();
+ valWritable = new BytesWritable();
+ prevVal = new DataInputBuffer();
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation()
+ throws HiveException, SerDeException {
+ valWritable.set(prevVal.getData(), prevVal.getPosition(),
+ prevVal.getLength() - prevVal.getPosition());
+ Object row = valueSerializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ int numSameKey = 0;
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!VectorGroupByCombiner.equals(key, prevKey)) {
+ // if current key is not equal to the previous key then we have to
emit the
+ // record. In case only one record was present for this key, then no
need to
+ // do aggregation, We can directly append the key and value. For key
with more
+ // than one record, we have to update the aggregation for the
current value only
+ // as for previous values (records) aggregation is already done in
previous
+ // iteration of loop.
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numSameKey = 0;
+ } else {
+ // If there are more than one record with same key then update the
aggregation.
+ updateAggregation();
+ numSameKey++;
+ }
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ numRows++;
+ }
+ if (numSameKey != 0) {
+ updateAggregation();
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch(Exception e) {
+ LOG.error("processRows failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+ try {
+ if (!rawIter.next()) {
+ return;
+ }
+ if (numValueCol == 0) {
+ // For no aggregation, RLE in writer will take care of reduction.
+ appendDirectlyToWriter(rawIter, writer);
+ } else {
+ processRows(rawIter, writer);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to combine rows", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ public static JobConf setCombinerInConf(BaseWork dest, JobConf conf, JobConf
destConf) {
+ if (conf == null || !HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_ENABLE_COMBINER_FOR_GROUP_BY)) {
+ return conf;
+ }
+
+ // As of now, this function is called for all edges. We are interested
only on edges
+ // to reducer.
+ if (!(dest instanceof ReduceWork)) {
+ return conf;
+ }
+ ReduceWork rw = (ReduceWork)dest;
+ Operator<?> reducer = rw.getReducer();
+ String combinerClass;
+
+ // In case of merge partial only we can have the combiner. In case of
reduce side
+ // aggregation, the mode will be COMPLETE and reducer expects the records
to be raw,
+ // not partially aggregated. For example, if its count, then reducer will
not do a
+ // sum. Instead it will count the number of input records. This will
result into
+ // invalid result if combiner has aggregated the records already.
+ if (reducer instanceof VectorGroupByOperator) {
+ VectorGroupByOperator operator = (VectorGroupByOperator) reducer;
+ if (operator.getConf().getMode() != GroupByDesc.Mode.MERGEPARTIAL) {
+ LOG.info("Combiner not set as operator is not MERGEPARTIAL" +
reducer.toString());
+ return conf;
+ }
+ combinerClass = VectorGroupByCombiner.class.getName();
+ } else if (reducer instanceof GroupByOperator) {
+ GroupByOperator operator = (GroupByOperator) reducer;
+ if (operator.getConf().getMode() != GroupByDesc.Mode.MERGEPARTIAL) {
+ LOG.info("Combiner not set as operator is not MERGEPARTIAL" +
reducer.toString());
+ return conf;
+ }
+ combinerClass = GroupByCombiner.class.getName();
+ } else {
+ LOG.info("Combiner not set as operator is not GroupByOperator" +
reducer.toString());
+ return conf;
+ }
+
+ conf = new JobConf(conf);
+ String plan = HiveConf.getVar(destConf, HiveConf.ConfVars.PLAN);
+
+ // One source vertex can have multiple destination vertex. Add the
destination vertex name
+ // to the config variable name to distinguish the plan of one reducer from
other.
+ conf.set(getConfigPrefix(dest.getName()) + HiveConf.ConfVars.PLAN.varname,
plan);
+ conf.set(getConfigPrefix(dest.getName()) + HAS_REDUCE_WORK, "true");
+
+ // Config with tez.runtime prefix are already part of included config.
+ conf.set("tez.runtime.combiner.class", combinerClass);
+
+ // This is adding the reduce work (destination node) plan path in the
config for source map node.
+ // In the combiner we use the aggregation functions used at the reducer to
aggregate
+ // the records. The mapper node (plan for map work) will not have these
info so we will get
+ // this from reduce plan.
+ //TODO Need to check if we can get the aggregate info from map plan if its
map side
Review comment:
ideally resolve TODO or replace with a ticket?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
Review comment:
not sure why we need a separate prefix for this -- the plan with the
conf would already be on the workers right?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for vectorized group by operator. In case of map side aggregate,
the partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ VectorGroupByCombiner.class.getName());
+ protected final Configuration conf;
+ protected final TezCounter combineInputRecordsCounter;
+ protected final TezCounter combineOutputRecordsCounter;
+ VectorAggregateExpression[] aggregators;
+ VectorAggregationBufferRow aggregationBufferRow;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from
columns of a row
+ // in a vectorized row batch.
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite>
valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ protected transient ByteStream.Output valueOutput;
+ DataInputBuffer valueBytesWritable;
+
+ // Only required minimal configs are copied to the worker nodes. This hack
(file.) is
+ // done to include these configs to be copied to the worker node.
+ protected static String confPrefixForWorker = "file.";
+
+ VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+ int firstValueColumnOffset;
+ VectorizedRowBatchCtx batchContext = null;
+ protected int numValueCol = 0;
+ protected ReduceWork rw;
+ VectorizedRowBatch outputBatch = null;
+ VectorizedRowBatch inputBatch = null;
+ protected AbstractSerDe inputKeyDeserializer = null;
+ protected ObjectInspector keyObjectInspector = null;
+ protected ObjectInspector valueObjectInspector = null;
+ protected StructObjectInspector valueStructInspectors = null;
+ protected StructObjectInspector keyStructInspector = null;
+
+ public VectorGroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+
+ combineInputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combineOutputRecordsCounter =
+
taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+ rw = getReduceWork(taskContext);
+ if (rw == null) {
+ return;
+ }
+
+ if (rw.getReducer() instanceof VectorGroupByOperator) {
+ VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator)
rw.getReducer();
+ vectorGroupByOperator.initializeOp(this.conf);
+ this.aggregators = vectorGroupByOperator.getAggregators();
+ this.aggregationBufferRow = allocateAggregationBuffer();
+ batchContext = rw.getVectorizedRowBatchCtx();
+ if ((aggregators == null) || (aggregators.length != numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
Review comment:
Point to ticket
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/util/FastByteComparisons.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+abstract public class FastByteComparisons {
Review comment:
is this a copy of org.apache.hadoop.io.FastByteComparisons ?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ AbstractSerDe valueSerializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+ DataInputBuffer prevKey;
+ BytesWritable valWritable;
+ DataInputBuffer prevVal;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getSerDeClass(), null);
+ valueSerializer.initialize(null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ prevKey = new DataInputBuffer();
+ valWritable = new BytesWritable();
+ prevVal = new DataInputBuffer();
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
Review comment:
log or throw
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]