Author: cheolsoo
Date: Fri Dec 12 20:15:26 2014
New Revision: 1645056
URL: http://svn.apache.org/r1645056
Log:
PIG-4066: An optimization for ROLLUP operation in Pig (cheolsoo)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/PigConstants.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
pig/trunk/src/org/apache/pig/parser/AliasMasker.g
pig/trunk/src/org/apache/pig/parser/AstPrinter.g
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryLexer.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/test/org/apache/pig/test/TestCubeOperator.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec 12 20:15:26 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4066: An optimization for ROLLUP operation in Pig (cheolsoo)
+
PIG-4333: Split BigData tests into multiple groups (rohini)
BUG FIXES
Modified: pig/trunk/src/org/apache/pig/Main.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Dec 12 20:15:26 2014
@@ -895,6 +895,7 @@ public class Main {
System.out.println(" GroupByConstParallelSetter - Force
parallel 1 for \"group all\" statement");
System.out.println(" PartitionFilterOptimizer -
Pushdown partition filter conditions to loader implementing LoadMetaData");
System.out.println(" PredicatePushdownOptimizer -
Pushdown filter predicates to loader implementing LoadPredicatePushDown");
+ System.out.println(" RollupHIIOptimizer - Apply Rollup
HII optimization");
System.out.println(" All - Disable all optimizations");
System.out.println(" All optimizations listed here are
enabled by default. Optimization values are case insensitive.");
System.out.println(" -v, -verbose - Print all error messages to
screen");
Modified: pig/trunk/src/org/apache/pig/PigConstants.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConstants.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConstants.java (original)
+++ pig/trunk/src/org/apache/pig/PigConstants.java Fri Dec 12 20:15:26 2014
@@ -59,4 +59,45 @@ public class PigConstants {
public static final String TIME_UDFS_ELAPSED_TIME_COUNTER =
"approx_microsecs";
public static final String TASK_INDEX = "mapreduce.task.index";
+
+ /**
+ * This parameter is used to check if the rollup is optimizable or not
after going
+ * through the RollupHIIOptimizer
+ */
+ public static final String PIG_HII_ROLLUP_OPTIMIZABLE =
"pig.hii.rollup.optimizable";
+
+ /**
+ * This parameter stores the value of the pivot position. If the rollup is
not optimizable
+ * this value will be -1; If the rollup is optimizable: if the user did
specify the pivot
+ * in the rollup clause, this parameter will get that value; if the user
did not specify
+ * the pivot in the rollup clause, this parameter will get the value of
the median position
+ * of the fields in the rollup clause
+ */
+ public static final String PIG_HII_ROLLUP_PIVOT = "pig.hii.rollup.pivot";
+
+ /**
+ * This parameter stores the index of the first field involves in the
rollup (or the first field
+ * involves in the rollup after changing the position of rollup to the end
in case of having cube)
+ */
+ public static final String PIG_HII_ROLLUP_FIELD_INDEX =
"pig.hii.rollup.field.index";
+
+ /**
+ * This parameter stores the index of the first field involves in the
rollup before
+ * changing the position of rollup to the end in case of having cube
+ */
+ public static final String PIG_HII_ROLLUP_OLD_FIELD_INDEX =
"pig.hii.rollup.old.field.index";
+
+ /**
+ * This parameter stores the size of total fields which involve in the
CUBE clause. For example, we
+ * have two CUBE clause:
+ * B = CUBE A BY CUBE(year, month, day), ROLLUP(hour, minute, second);
+ * B = CUBE A BY ROLLUP(year, month, day, hour, minute, second);
+ * So this parameter will be 6 at both cases.
+ */
+ public static final String PIG_HII_NUMBER_TOTAL_FIELD =
"pig.hii.number.total.field";
+
+ /**
+ * This parameter stores the number of algebraic functions that used after
rollup.
+ */
+ public static final String PIG_HII_NUMBER_ALGEBRAIC =
"pig.hii.number.algebraic";
}
\ No newline at end of file
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Dec 12 20:15:26 2014
@@ -66,12 +66,14 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.RollupHIIPartitioner;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -183,6 +185,8 @@ public class JobControlCompiler{
public static final String PIG_MAP_STORES = "pig.map.stores";
public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
+ private static final String ROLLUP_PARTITIONER =
RollupHIIPartitioner.class.getName();
+
// A mapping of job to pair of store locations and tmp locations for that
job
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
@@ -524,6 +528,9 @@ public class JobControlCompiler{
configureCompression(conf);
try{
+ //Set default value for PIG_HII_ROLLUP_OPTIMIZABLE to false
+ conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
+
//Process the POLoads
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan,
POLoad.class);
@@ -836,13 +843,52 @@ public class JobControlCompiler{
log.info("Setting identity combiner class.");
}
pack = (POPackage)mro.reducePlan.getRoots().get(0);
- if(!pigContext.inIllustrator)
+
+ if(pack!=null) {
+ if(pack.getPivot()!=-1) {
+ //Set value for PIG_HII_ROLLUP_OPTIMIZABLE to true
+
conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, true);
+ //Set the pivot value
+ conf.setInt(PigConstants.PIG_HII_ROLLUP_PIVOT,
pack.getPivot());
+ //Set the index of the first field involves in ROLLUP
+ conf.setInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX,
pack.getRollupFieldIndex());
+ //Set the original index of the first field involves
in ROLLUP in case it was moved to the end
+ //(if we have the combination of cube and rollup)
+
conf.setInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX,
pack.getRollupOldFieldIndex());
+ //Set the size of total fields that involve in CUBE
clause
+ conf.setInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD,
pack.getDimensionSize());
+ //Set number of algebraic functions that used after
rollup
+ conf.setInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC,
pack.getNumberAlgebraic());
+ //Set number of reducer to 1 due to using IRG algorithm
+ if(pack.getPivot() == 0 && !mro.reducePlan.isEmpty()) {
+ updateNumReducers(plan, mro, nwJob);
+ }
+ }
+ }
+
+ if (!pigContext.inIllustrator) {
mro.reducePlan.remove(pack);
- nwJob.setMapperClass(PigMapReduce.Map.class);
+ }
+
+ if (pack!=null && pack.getPivot()!=-1) {
+ nwJob.setMapperClass(PigMapReduce.MapRollupHII.class);
+ } else {
+ nwJob.setMapperClass(PigMapReduce.Map.class);
+ }
+
nwJob.setReducerClass(PigMapReduce.Reduce.class);
- if (mro.customPartitioner != null)
-
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+ // Set Rollup Partitioner in case the pivot is not equal to -1
+ // and the custormPartitioner name is our rollup partitioner.
+ if (mro.customPartitioner != null) {
+ if (mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
+ if (pack.getPivot()!=-1) {
+
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+ }
+ } else {
+
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+ }
+ }
if(!pigContext.inIllustrator)
conf.set("pig.mapPlan",
ObjectSerializer.serialize(mro.mapPlan));
@@ -1058,6 +1104,26 @@ public class JobControlCompiler{
}
/**
+ * If pivot position is zero, we use only one reducer
+ * @param plan the MR plan
+ * @param mro the MR operator
+ * @param nwJob the current job
+ * @throws IOException
+ */
+ public void updateNumReducers(MROperPlan plan, MapReduceOper mro,
+ org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
+ // Change number of reducer to 1 if only IRG is used
+ if (mro.customPartitioner != null &&
mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
+ log.info("Changing Parallelism to 1 due to using IRG");
+ }
+ conf.setInt("pig.info.reducers.default.parallel", 1);
+ conf.setInt("pig.info.reducers.requested.parallel", 1);
+ conf.setInt("pig.info.reducers.estimated.parallel", 1);
+ conf.setInt(MRConfiguration.REDUCE_TASKS, 1);
+ nwJob.setNumReduceTasks(1);
+ }
+
+ /**
* Calculate the runtime #reducers based on the default_parallel,
requested parallel and estimated
* parallel, and save it to MapReduceOper's runtimeParallelism.
* @return the runtimeParallelism
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Dec 12 20:15:26 2014
@@ -76,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -1098,6 +1099,11 @@ public class MRCompiler extends PhyPlanV
}
@Override
+ public void visitPORollupHIIForEach(PORollupHIIForEach op) throws
VisitorException {
+ visitPOForEach(op);
+ }
+
+ @Override
public void visitGlobalRearrange(POGlobalRearrange op) throws
VisitorException{
try{
blocking(op);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Fri Dec 12 20:15:26 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
@@ -50,6 +51,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -137,6 +139,92 @@ public class PigGenericMapReduce {
}
/**
+ * This map is only used for the Rollup when the RollupHIIOptimizer is
enabled
+ *
+ */
+ public static class MapRollupHII extends PigMapBase {
+ @Override
+ public void collect(Context oc, Tuple tuple)
+ throws InterruptedException, IOException {
+
+ Byte index = (Byte)tuple.get(0);
+ PigNullableWritable key =
+ HDataType.getWritableComparableTypes(tuple.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
+ // Both the key and the value need the index. The key needs it so
+ // that it can be sorted on the index in addition to the key
+ // value. The value needs it so that POPackage can properly
+ // assign the tuple to its slot in the projection.
+ key.setIndex(index);
+ val.setIndex(index);
+
+ oc.write(key, val);
+ }
+
+ @Override
+ public void cleanup(Context oc)
+ throws InterruptedException, IOException {
+
+ Configuration jConf = oc.getConfiguration();
+
+ boolean isHII =
jConf.getBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
+ //If our rule is enabled and is using, there will be a
PORollupHIIForEach
+ //We will create marker tuples which are considered as markers for
reducers
+ //to calculate the remaining results when that reducer goes to the
end of the
+ //input records. This marker tuple will have larger size than the
defaut by one
+ //dimension. This addition dimension will be the value which are
ranged from 0 to
+ //number of reducers. By this addition, we can make sure that
every reducers can
+ //receive these marker tuples to finish their works.
+ if(isHII) {
+ int reducerNo = jConf.getInt("mapred.reduce.tasks", 0);
+ int length =
jConf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0);
+ int nAlgebraic =
jConf.getInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 1);
+
+ if(length == 0)
+ return;
+
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ //An array of marker tuples which has size equals to number of
reducers
+ Tuple group[] = new Tuple[reducerNo];
+ int count = 0;
+ //Make sure that all reducers will receive those marker tuples
+ while(count < reducerNo) {
+ //Create marker tuple with last field is the reducer's
index,
+ //the rest are null.
+ group[count] = mTupleFactory.newTuple();
+ for (int k = 0; k <= length; k++) {
+ if(k < length) {
+ group[count].append(null);
+ } else {
+ group[count].append(count);
+ }
+ }
+
+ Tuple value = mTupleFactory.newTuple();
+ Tuple []tmp = new Tuple[nAlgebraic];
+ long valtmp = 1;
+ for(int i = 0; i < nAlgebraic; i++){
+ tmp[i] = mTupleFactory.newTuple();
+ tmp[i].append(valtmp);
+ value.append(tmp[i]);
+ }
+ Tuple out = mTupleFactory.newTuple();
+ out.append(0);
+ out.append(group[count]);
+ out.append(value);
+
+ PigNullableWritable key =
HDataType.getWritableComparableTypes(out.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)out.get(2));
+ oc.write(key, val);
+ count++;
+ }
+ }
+ super.cleanup(oc);
+ }
+ }
+
+ /**
* This "specialized" map class is ONLY to be used in pig queries with
* order by a udf. A UDF used for comparison in the order by expects
* to be handed tuples. Hence this map class ensures that the "key" used
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java?rev=1645056&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
Fri Dec 12 20:15:26 2014
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * This class provides a partitioner when the RollupOptimizer is activated. The
+ * map output key space is partitioned by the dimension which the pivot
position
+ * is assigned to. For example, we have a tuple (year, month, day, payload) and
+ * the pivot is 2, so the map output space will be partitioned by month, which
+ * means that there will be one reducer per month.
+ */
+public class RollupHIIPartitioner extends HashPartitioner<PigNullableWritable,
Writable> implements Configurable {
+
+ protected MessageDigest m = null;
+ protected int pivot = 0;
+ protected int rollupFieldIndex = 0;
+ protected int rollupOldFieldIndex = 0;
+ protected boolean pivotZero = false;
+ protected int length = 0;
+
+ public RollupHIIPartitioner() throws NoSuchAlgorithmException {
+ m = MessageDigest.getInstance("MD5");
+ }
+
+ public void setPivot(int pvt) {
+ pivot = pvt;
+ }
+
+ public int getPartition(PigNullableWritable key, Writable value, int
numPartitions) {
+ try {
+ Tuple t = (Tuple) key.getValueAsPigType();
+ if (pivot == -1)
+ return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+
+ // We use IRG --> only one reducer.
+ if (pivotZero) {
+ return 0;
+ } else {
+ // We transfer them to the determined reducer.
+ if (t.get(pivot - 1) == null) {
+ // Check if it is a marker tuple
+ if (t.size() > length) {
+ int lenSpecial = t.size();
+ // Send it to the reducer which has been decided before
+ // by the addition dimension we added in the cleanup
+ // phase of each map.
+ return (Integer) t.get(lenSpecial - 1);
+ } else
+ return 0;
+ } else {
+ // We partition the key output space by the dimension which
+ // the pivot is assigned at. We use MD5 instead of hash
+ // partitioner because partition with MD5 will provide us a
+ // better randomization than the default hash.
+ m.reset();
+ for (int i = rollupFieldIndex; i < pivot; i++) {
+ Object a = t.get(i);
+ ByteArrayOutputStream bos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(a);
+ oos.flush();
+ oos.close();
+ bos.close();
+ byte []tmp = bos.toByteArray();
+
m.update(ByteBuffer.allocate(tmp.length).put(tmp).array());
+ }
+ return (m.digest()[15] & Integer.MAX_VALUE) %
numPartitions;
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Configuration getConf() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ //Get the pivot
+ pivot = conf.getInt(PigConstants.PIG_HII_ROLLUP_PIVOT, -1);
+ //Get the index of the first field involves in ROLLUP
+ rollupFieldIndex =
conf.getInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX, 0);
+ //Get the original index of the first field involves in ROLLUP in case
it was moved to the end
+ //(if we have the combination of cube and rollup)
+ rollupOldFieldIndex =
conf.getInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX, 0);
+ //Get the size of total fields that involve in CUBE clause
+ length = conf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0);
+ // We must check the original pivot value before it is updated
+ // if there are many rollup/cube.
+ if (pivot == 0) {
+ pivotZero = true;
+ }
+
+ //The Rollup was moved to the end of the clause, because there is(are)
+ //Cube operator, so we need to update the pivot position.
+ if (rollupFieldIndex != 0) {
+ pivot = pivot + rollupFieldIndex;
+ }
+ }
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Fri Dec 12 20:15:26 2014
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.builtin.RollupDimensions;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFactory;
@@ -86,6 +87,28 @@ public class POUserFunc extends Expressi
private long timingFrequency = 100L;
private boolean doTiming = false;
+ private static final String ROLLUP_UDF = RollupDimensions.class.getName();
+ //the pivot value
+ private int pivot = -1;
+
+ private boolean rollupHIIoptimizable = false;
+
+ public void setPivot(int pvt) {
+ this.pivot = pvt;
+ }
+
+ public int getPivot() {
+ return this.pivot;
+ }
+
+ public void setRollupHIIOptimizable(boolean check) {
+ this.rollupHIIoptimizable = check;
+ }
+
+ public boolean getRollupHIIOptimizable() {
+ return this.rollupHIIoptimizable;
+ }
+
public PhysicalOperator getReferencedOperator() {
return referencedOperator;
}
@@ -127,6 +150,17 @@ public class POUserFunc extends Expressi
if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
executor = new MonitoredUDFExecutor(func);
}
+
+ if (funcSpec.getClassName().equals(ROLLUP_UDF) &&
this.rollupHIIoptimizable != false) {
+ try {
+ ((RollupDimensions) func).setPivot(this.pivot);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ ((RollupDimensions)
func).setRollupHIIOptimizable(this.rollupHIIoptimizable);
+ }
+
//the next couple of initializations do not work as intended for the
following reasons
//the reporter and pigLogger are member variables of PhysicalOperator
//when instanitateFunc is invoked at deserialization time, both
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
Fri Dec 12 20:15:26 2014
@@ -67,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -152,6 +153,10 @@ public class PhyPlanVisitor extends Plan
}
}
+ public void visitPORollupHIIForEach(PORollupHIIForEach nhfe) throws
VisitorException {
+ visitPOForEach(nhfe);
+ }
+
public void visitUnion(POUnion un) throws VisitorException{
//do nothing
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Fri Dec 12 20:15:26 2014
@@ -87,6 +87,58 @@ public class POPackage extends PhysicalO
private transient boolean useDefaultBag;
private transient int accumulativeBatchSize;
+ //the pivot value
+ private int pivot = -1;
+ //the index of the first field involves in ROLLUP
+ protected int rollupFieldIndex = 0;
+ //the original index of the first field involves in ROLLUP in case it was
moved to the end
+ //(if we have the combination of cube and rollup)
+ private int rollupOldFieldIndex = 0;
+ //the size of total fields that involve in CUBE clause
+ private int dimensionSize = 0;
+ //number of algebraic function that used after rollup
+ private int nAlgebraic = 0;
+
+ public void setPivot(int pvt) {
+ this.pivot = pvt;
+ }
+
+ public int getPivot() {
+ return this.pivot;
+ }
+
+ public void setDimensionSize(int ds) {
+ this.dimensionSize = ds;
+ }
+
+ public int getDimensionSize() {
+ return this.dimensionSize;
+ }
+
+ public void setNumberAlgebraic(int na) {
+ this.nAlgebraic = na;
+ }
+
+ public int getNumberAlgebraic() {
+ return this.nAlgebraic;
+ }
+
+ public void setRollupOldFieldIndex(int rofi) {
+ this.rollupOldFieldIndex = rofi;
+ }
+
+ public int getRollupOldFieldIndex() {
+ return this.rollupOldFieldIndex;
+ }
+
+ public void setRollupFieldIndex(int rfi) {
+ this.rollupFieldIndex = rfi;
+ }
+
+ public int getRollupFieldIndex() {
+ return this.rollupFieldIndex;
+ }
+
public POPackage(OperatorKey k) {
this(k, -1, null);
}