Author: cheolsoo
Date: Fri Dec 12 20:15:26 2014
New Revision: 1645056

URL: http://svn.apache.org/r1645056
Log:
PIG-4066: An optimization for ROLLUP operation in Pig (cheolsoo)

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java
    
pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigConstants.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java
    
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    
pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java
    
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
    
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/test/TestCubeOperator.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec 12 20:15:26 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4066: An optimization for ROLLUP operation in Pig (cheolsoo)
+
 PIG-4333: Split BigData tests into multiple groups (rohini)
  
 BUG FIXES

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Dec 12 20:15:26 2014
@@ -895,6 +895,7 @@ public class Main {
             System.out.println("            GroupByConstParallelSetter - Force 
parallel 1 for \"group all\" statement");
             System.out.println("            PartitionFilterOptimizer - 
Pushdown partition filter conditions to loader implementing LoadMetaData");
             System.out.println("            PredicatePushdownOptimizer - 
Pushdown filter predicates to loader implementing LoadPredicatePushDown");
+            System.out.println("            RollupHIIOptimizer - Apply Rollup 
HII optimization");
             System.out.println("            All - Disable all optimizations");
             System.out.println("        All optimizations listed here are 
enabled by default. Optimization values are case insensitive.");
             System.out.println("    -v, -verbose - Print all error messages to 
screen");

Modified: pig/trunk/src/org/apache/pig/PigConstants.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConstants.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConstants.java (original)
+++ pig/trunk/src/org/apache/pig/PigConstants.java Fri Dec 12 20:15:26 2014
@@ -59,4 +59,45 @@ public class PigConstants {
     public static final String TIME_UDFS_ELAPSED_TIME_COUNTER = 
"approx_microsecs";
 
     public static final String TASK_INDEX = "mapreduce.task.index";
+
+    /**
+     * This parameter is used to check if the rollup is optimizable or not 
after going
+     * through the RollupHIIOptimizer
+     */
+    public static final String PIG_HII_ROLLUP_OPTIMIZABLE = 
"pig.hii.rollup.optimizable";
+
+    /**
+     * This parameter stores the value of the pivot position. If the rollup is 
not optimizable
+     * this value will be -1; If the rollup is optimizable: if the user did 
specify the pivot
+     * in the rollup clause, this parameter will get that value; if the user 
did not specify
+     * the pivot in the rollup clause, this parameter will get the value of 
the median position
+     * of the fields in the rollup clause
+     */
+    public static final String PIG_HII_ROLLUP_PIVOT = "pig.hii.rollup.pivot";
+
+    /**
+     * This parameter stores the index of the first field involves in the 
rollup (or the first field
+     * involves in the rollup after changing the position of rollup to the end 
in case of having cube)
+     */
+    public static final String PIG_HII_ROLLUP_FIELD_INDEX = 
"pig.hii.rollup.field.index";
+
+    /**
+     * This parameter stores the index of the first field involves in the 
rollup before
+     * changing the position of rollup to the end in case of having cube
+     */
+    public static final String PIG_HII_ROLLUP_OLD_FIELD_INDEX = 
"pig.hii.rollup.old.field.index";
+
+    /**
+     * This parameter stores the size of total fields which involve in the 
CUBE clause. For example, we
+     * have two CUBE clause:
+     * B = CUBE A BY CUBE(year, month, day), ROLLUP(hour, minute, second);
+     * B = CUBE A BY ROLLUP(year, month, day, hour, minute, second);
+     * So this parameter will be 6 at both cases.
+     */
+    public static final String PIG_HII_NUMBER_TOTAL_FIELD = 
"pig.hii.number.total.field";
+
+    /**
+     * This parameter stores the number of algebraic functions that used after 
rollup.
+     */
+    public static final String PIG_HII_NUMBER_ALGEBRAIC = 
"pig.hii.number.algebraic";
 }
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Dec 12 20:15:26 2014
@@ -66,12 +66,14 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.RollupHIIPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -183,6 +185,8 @@ public class JobControlCompiler{
     public static final String PIG_MAP_STORES = "pig.map.stores";
     public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
 
+    private static final String ROLLUP_PARTITIONER = 
RollupHIIPartitioner.class.getName();
+
     // A mapping of job to pair of store locations and tmp locations for that 
job
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
@@ -524,6 +528,9 @@ public class JobControlCompiler{
         configureCompression(conf);
 
         try{
+            //Set default value for PIG_HII_ROLLUP_OPTIMIZABLE to false
+            conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
+
             //Process the POLoads
             List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, 
POLoad.class);
 
@@ -836,13 +843,52 @@ public class JobControlCompiler{
                     log.info("Setting identity combiner class.");
                 }
                 pack = (POPackage)mro.reducePlan.getRoots().get(0);
-                if(!pigContext.inIllustrator)
+
+                if(pack!=null) {
+                    if(pack.getPivot()!=-1) {
+                        //Set value for PIG_HII_ROLLUP_OPTIMIZABLE to true
+                        
conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, true);
+                        //Set the pivot value
+                        conf.setInt(PigConstants.PIG_HII_ROLLUP_PIVOT, 
pack.getPivot());
+                        //Set the index of the first field involves in ROLLUP
+                        conf.setInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX, 
pack.getRollupFieldIndex());
+                        //Set the original index of the first field involves 
in ROLLUP in case it was moved to the end
+                        //(if we have the combination of cube and rollup)
+                        
conf.setInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX, 
pack.getRollupOldFieldIndex());
+                        //Set the size of total fields that involve in CUBE 
clause
+                        conf.setInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 
pack.getDimensionSize());
+                        //Set number of algebraic functions that used after 
rollup
+                        conf.setInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 
pack.getNumberAlgebraic());
+                        //Set number of reducer to 1 due to using IRG algorithm
+                        if(pack.getPivot() == 0 && !mro.reducePlan.isEmpty()) {
+                            updateNumReducers(plan, mro, nwJob);
+                        }
+                    }
+                }
+
+                if (!pigContext.inIllustrator) {
                     mro.reducePlan.remove(pack);
-                nwJob.setMapperClass(PigMapReduce.Map.class);
+                }
+
+                if (pack!=null && pack.getPivot()!=-1) {
+                    nwJob.setMapperClass(PigMapReduce.MapRollupHII.class);
+                } else {
+                    nwJob.setMapperClass(PigMapReduce.Map.class);
+                }
+
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
 
-                if (mro.customPartitioner != null)
-                    
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+                // Set Rollup Partitioner in case the pivot is not equal to -1
+                // and the custormPartitioner name is our rollup partitioner.
+                if (mro.customPartitioner != null) {
+                    if (mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
+                        if (pack.getPivot()!=-1) {
+                            
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+                        }
+                    } else {
+                        
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
+                    }
+                }
 
                 if(!pigContext.inIllustrator)
                     conf.set("pig.mapPlan", 
ObjectSerializer.serialize(mro.mapPlan));
@@ -1058,6 +1104,26 @@ public class JobControlCompiler{
     }
 
     /**
+     * If pivot position is zero, we use only one reducer
+     * @param plan the MR plan
+     * @param mro the MR operator
+     * @param nwJob the current job
+     * @throws IOException
+     */
+    public void updateNumReducers(MROperPlan plan, MapReduceOper mro,
+    org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
+        // Change number of reducer to 1 if only IRG is used
+        if (mro.customPartitioner != null && 
mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
+            log.info("Changing Parallelism to 1 due to using IRG");
+        }
+        conf.setInt("pig.info.reducers.default.parallel", 1);
+        conf.setInt("pig.info.reducers.requested.parallel", 1);
+        conf.setInt("pig.info.reducers.estimated.parallel", 1);
+        conf.setInt(MRConfiguration.REDUCE_TASKS, 1);
+        nwJob.setNumReduceTasks(1);
+    }
+
+    /**
      * Calculate the runtime #reducers based on the default_parallel, 
requested parallel and estimated
      * parallel, and save it to MapReduceOper's runtimeParallelism.
      * @return the runtimeParallelism

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Dec 12 20:15:26 2014
@@ -76,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -1098,6 +1099,11 @@ public class MRCompiler extends PhyPlanV
     }
 
     @Override
+    public void visitPORollupHIIForEach(PORollupHIIForEach op) throws 
VisitorException {
+        visitPOForEach(op);
+    }
+
+    @Override
     public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException{
         try{
             blocking(op);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Fri Dec 12 20:15:26 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -50,6 +51,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -137,6 +139,92 @@ public class PigGenericMapReduce {
     }
 
     /**
+     * This map is only used for the Rollup when the RollupHIIOptimizer is 
enabled
+     *
+     */
+    public static class MapRollupHII extends PigMapBase {
+        @Override
+        public void collect(Context oc, Tuple tuple)
+                throws InterruptedException, IOException {
+
+            Byte index = (Byte)tuple.get(0);
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(tuple.get(1), keyType);
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            key.setIndex(index);
+            val.setIndex(index);
+
+            oc.write(key, val);
+        }
+
+        @Override
+        public void cleanup(Context oc)
+                throws InterruptedException, IOException {
+
+            Configuration jConf = oc.getConfiguration();
+
+            boolean isHII = 
jConf.getBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
+            //If our rule is enabled and is using, there will be a 
PORollupHIIForEach
+            //We will create marker tuples which are considered as markers for 
reducers
+            //to calculate the remaining results when that reducer goes to the 
end of the
+            //input records. This marker tuple will have larger size than the 
defaut by one
+            //dimension. This addition dimension will be the value which are 
ranged from 0 to
+            //number of reducers. By this addition, we can make sure that 
every reducers can
+            //receive these marker tuples to finish their works.
+            if(isHII) {
+                int reducerNo = jConf.getInt("mapred.reduce.tasks", 0);
+                int length = 
jConf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0);
+                int nAlgebraic = 
jConf.getInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 1);
+
+                if(length == 0)
+                    return;
+
+                TupleFactory mTupleFactory = TupleFactory.getInstance();
+                //An array of marker tuples which has size equals to number of 
reducers
+                Tuple group[] = new Tuple[reducerNo];
+                int count = 0;
+                //Make sure that all reducers will receive those marker tuples
+                while(count < reducerNo) {
+                    //Create marker tuple with last field is the reducer's 
index,
+                    //the rest are null.
+                    group[count] = mTupleFactory.newTuple();
+                    for (int k = 0; k <= length; k++) {
+                        if(k < length) {
+                            group[count].append(null);
+                        } else {
+                            group[count].append(count);
+                        }
+                    }
+
+                    Tuple value = mTupleFactory.newTuple();
+                    Tuple []tmp = new Tuple[nAlgebraic];
+                    long valtmp = 1;
+                    for(int i = 0; i < nAlgebraic; i++){
+                        tmp[i] = mTupleFactory.newTuple();
+                        tmp[i].append(valtmp);
+                        value.append(tmp[i]);
+                    }
+                    Tuple out = mTupleFactory.newTuple();
+                    out.append(0);
+                    out.append(group[count]);
+                    out.append(value);
+
+                    PigNullableWritable key = 
HDataType.getWritableComparableTypes(out.get(1), keyType);
+                    NullableTuple val = new NullableTuple((Tuple)out.get(2));
+                    oc.write(key, val);
+                    count++;
+                }
+            }
+            super.cleanup(oc);
+         }
+    }
+
+    /**
      * This "specialized" map class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
      * to be handed tuples. Hence this map class ensures that the "key" used

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java?rev=1645056&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/RollupHIIPartitioner.java
 Fri Dec 12 20:15:26 2014
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * This class provides a partitioner when the RollupOptimizer is activated. The
+ * map output key space is partitioned by the dimension which the pivot 
position
+ * is assigned to. For example, we have a tuple (year, month, day, payload) and
+ * the pivot is 2, so the map output space will be partitioned by month, which
+ * means that there will be one reducer per month.
+ */
+public class RollupHIIPartitioner extends HashPartitioner<PigNullableWritable, 
Writable> implements Configurable {
+
+    protected MessageDigest m = null;
+    protected int pivot = 0;
+    protected int rollupFieldIndex = 0;
+    protected int rollupOldFieldIndex = 0;
+    protected boolean pivotZero = false;
+    protected int length = 0;
+
+    public RollupHIIPartitioner() throws NoSuchAlgorithmException {
+        m = MessageDigest.getInstance("MD5");
+    }
+
+    public void setPivot(int pvt) {
+        pivot = pvt;
+    }
+
+    public int getPartition(PigNullableWritable key, Writable value, int 
numPartitions) {
+        try {
+            Tuple t = (Tuple) key.getValueAsPigType();
+            if (pivot == -1)
+                return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+
+            // We use IRG --> only one reducer.
+            if (pivotZero) {
+                return 0;
+            } else {
+                // We transfer them to the determined reducer.
+                if (t.get(pivot - 1) == null) {
+                    // Check if it is a marker tuple
+                    if (t.size() > length) {
+                        int lenSpecial = t.size();
+                        // Send it to the reducer which has been decided before
+                        // by the addition dimension we added in the cleanup
+                        // phase of each map.
+                        return (Integer) t.get(lenSpecial - 1);
+                    } else
+                        return 0;
+                } else {
+                    // We partition the key output space by the dimension which
+                    // the pivot is assigned at. We use MD5 instead of hash
+                    // partitioner because partition with MD5 will provide us a
+                    // better randomization than the default hash.
+                    m.reset();
+                    for (int i = rollupFieldIndex; i < pivot; i++) {
+                        Object a = t.get(i);
+                        ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
+                        ObjectOutputStream oos = new ObjectOutputStream(bos);
+                        oos.writeObject(a);
+                        oos.flush();
+                        oos.close();
+                        bos.close();
+                        byte []tmp = bos.toByteArray();
+                        
m.update(ByteBuffer.allocate(tmp.length).put(tmp).array());
+                    }
+                    return (m.digest()[15] & Integer.MAX_VALUE) % 
numPartitions;
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Configuration getConf() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        //Get the pivot
+        pivot = conf.getInt(PigConstants.PIG_HII_ROLLUP_PIVOT, -1);
+        //Get the index of the first field involves in ROLLUP
+        rollupFieldIndex = 
conf.getInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX, 0);
+        //Get the original index of the first field involves in ROLLUP in case 
it was moved to the end
+        //(if we have the combination of cube and rollup)
+        rollupOldFieldIndex = 
conf.getInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX, 0);
+        //Get the size of total fields that involve in CUBE clause
+        length = conf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0);
+        // We must check the original pivot value before it is updated
+        // if there are many rollup/cube.
+        if (pivot == 0) {
+            pivotZero = true;
+        }
+
+        //The Rollup was moved to the end of the clause, because there is(are)
+        //Cube operator, so we need to update the pivot position.
+        if (rollupFieldIndex != 0) {
+            pivot = pivot + rollupFieldIndex;
+        }
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Fri Dec 12 20:15:26 2014
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
 import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.builtin.RollupDimensions;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
@@ -86,6 +87,28 @@ public class POUserFunc extends Expressi
     private long timingFrequency = 100L;
     private boolean doTiming = false;
 
+    private static final String ROLLUP_UDF = RollupDimensions.class.getName();
+    //the pivot value
+    private int pivot = -1;
+
+    private boolean rollupHIIoptimizable = false;
+
+    public void setPivot(int pvt) {
+        this.pivot = pvt;
+    }
+
+    public int getPivot() {
+        return this.pivot;
+    }
+
+    public void setRollupHIIOptimizable(boolean check) {
+        this.rollupHIIoptimizable = check;
+    }
+
+    public boolean getRollupHIIOptimizable() {
+        return this.rollupHIIoptimizable;
+    }
+
     public PhysicalOperator getReferencedOperator() {
         return referencedOperator;
     }
@@ -127,6 +150,17 @@ public class POUserFunc extends Expressi
         if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
             executor = new MonitoredUDFExecutor(func);
         }
+
+        if (funcSpec.getClassName().equals(ROLLUP_UDF) && 
this.rollupHIIoptimizable != false) {
+            try {
+                ((RollupDimensions) func).setPivot(this.pivot);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            ((RollupDimensions) 
func).setRollupHIIOptimizable(this.rollupHIIoptimizable);
+        }
+
         //the next couple of initializations do not work as intended for the 
following reasons
         //the reporter and pigLogger are member variables of PhysicalOperator
         //when instanitateFunc is invoked at deserialization time, both

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Fri Dec 12 20:15:26 2014
@@ -67,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -152,6 +153,10 @@ public class PhyPlanVisitor extends Plan
         }
     }
 
+    public void visitPORollupHIIForEach(PORollupHIIForEach nhfe) throws 
VisitorException {
+        visitPOForEach(nhfe);
+    }
+
     public void visitUnion(POUnion un) throws VisitorException{
         //do nothing
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1645056&r1=1645055&r2=1645056&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Fri Dec 12 20:15:26 2014
@@ -87,6 +87,58 @@ public class POPackage extends PhysicalO
     private transient boolean useDefaultBag;
     private transient int accumulativeBatchSize;
 
+    //the pivot value
+    private int pivot = -1;
+    //the index of the first field involves in ROLLUP
+    protected int rollupFieldIndex = 0;
+    //the original index of the first field involves in ROLLUP in case it was 
moved to the end
+    //(if we have the combination of cube and rollup)
+    private int rollupOldFieldIndex = 0;
+    //the size of total fields that involve in CUBE clause
+    private int dimensionSize = 0;
+    //number of algebraic function that used after rollup
+    private int nAlgebraic = 0;
+
+    public void setPivot(int pvt) {
+        this.pivot = pvt;
+    }
+
+    public int getPivot() {
+        return this.pivot;
+    }
+
+    public void setDimensionSize(int ds) {
+        this.dimensionSize = ds;
+    }
+
+    public int getDimensionSize() {
+        return this.dimensionSize;
+    }
+
+    public void setNumberAlgebraic(int na) {
+        this.nAlgebraic = na;
+    }
+
+    public int getNumberAlgebraic() {
+        return this.nAlgebraic;
+    }
+
+    public void setRollupOldFieldIndex(int rofi) {
+        this.rollupOldFieldIndex = rofi;
+    }
+
+    public int getRollupOldFieldIndex() {
+        return this.rollupOldFieldIndex;
+    }
+
+    public void setRollupFieldIndex(int rfi) {
+        this.rollupFieldIndex = rfi;
+    }
+
+    public int getRollupFieldIndex() {
+        return this.rollupFieldIndex;
+    }
+
     public POPackage(OperatorKey k) {
         this(k, -1, null);
     }


Reply via email to