Author: rohini
Date: Mon Oct 17 15:22:57 2016
New Revision: 1765312

URL: http://svn.apache.org/viewvc?rev=1765312&view=rev
Log:
PIG-5041: RoundRobinPartitioner is not deterministic when order of input 
records change (rohini)

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
    pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 17 15:22:57 2016
@@ -48,6 +48,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5041: RoundRobinPartitioner is not deterministic when order of input 
records change (rohini)
+
 PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of 
Random (rohini
 
 PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via 
rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Mon Oct 17 15:22:57 2016
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -53,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
 import org.apache.pig.builtin.JsonStorage;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -590,7 +590,7 @@ public class UnionOptimizer extends TezO
             // more union predecessors. Change it to SCATTER_GATHER
             if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                 edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.partitionerClass = HashValuePartitioner.class;
                 edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1765312&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 Mon Oct 17 15:22:57 2016
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int getPartition(Writable key, Writable value, int numPartitions) {
+        int hash = 17;
+        Tuple tuple;
+        if (value instanceof Tuple) {
+            // union optimizer turned off
+            tuple = (Tuple) value;
+        } else {
+            // union followed by order by or skewed join
+            tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+        }
+        if (tuple != null) {
+            for (Object o : tuple.getAll()) {
+                if (o != null) {
+                    // Skip computing hashcode for bags.
+                    // Order of elements in the map/bag may be different on 
each run
+                    if (o instanceof DataBag) {
+                        hash = 31 * hash;
+                    } else if (o instanceof Map) {
+                        // Including size of map as it is easily available
+                        // Not doing for DataBag as some implementations 
actually
+                        // iterate through all elements in the bag to get the 
size.
+                        hash = 31 * hash + ((Map) o).size();
+                    } else {
+                        hash = 31 * hash + o.hashCode();
+                    }
+                }
+            }
+        }
+        return (hash & Integer.MAX_VALUE) % numPartitions;
+    }
+
+}
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Mon Oct 17 15:22:57 2016
@@ -40,9 +40,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -269,7 +269,7 @@ public class TezCompilerUtil {
         } else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
             edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
             edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            edge.partitionerClass = HashValuePartitioner.class;
         }
         
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);

Modified: pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java Mon Oct 17 
15:22:57 2016
@@ -22,6 +22,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most 
cases.
+ */
+@Deprecated
 public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
         implements Configurable {
 

Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java Mon Oct 17 
15:22:57 2016
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigServer;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.test.Util;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -72,19 +71,17 @@ public class TestTezJobExecution {
     }
 
     @Test
-    public void testUnionParallelRoundRobinBatchSize() throws IOException {
+    public void testUnionParallelHashValuePartition() throws IOException {
         String output = TEST_DIR + Path.SEPARATOR + "output1";
         String query = "A = LOAD '" + INPUT_FILE + "';"
                 + "B = LOAD '" + INPUT_FILE + "';"
                 + "C = UNION A, B PARALLEL 2;"
                 + "STORE C into '" + output + "';";
-        pigServer.getPigContext().getProperties().setProperty(
-                RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 
"3");
         pigServer.registerQuery(query);
         String part0 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00000"));
         String part1 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00001"));
-        assertEquals("1\n1\n1\n1\n1\n1\n", part0);
-        assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+        assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+        assertEquals("1\n1\n1\n1\n1\n1\n", part1);
     }
 
     @Test
@@ -108,7 +105,7 @@ public class TestTezJobExecution {
         // Recovery is not disabled when there is auto parallelism. Should 
reuse AM application session
         PigStats stats = PigRunner.run(args, listener);
         assertTrue(stats.isSuccessful());
-        assertEquals(listener.getJobsStarted().size(), 1);
+        assertEquals(1, listener.getJobsStarted().size());
 
         Util.deleteFile(pigServer.getPigContext(), output1);
         Util.deleteFile(pigServer.getPigContext(), output2);
@@ -122,7 +119,7 @@ public class TestTezJobExecution {
                 scriptFile };
         stats = PigRunner.run(args, listener);
         assertTrue(stats.isSuccessful());
-        assertEquals(listener.getJobsStarted().size(), 2);
+        assertEquals(2, listener.getJobsStarted().size());
     }
 
 


Reply via email to