Author: rohini
Date: Mon Oct 17 19:08:26 2016
New Revision: 1765354

URL: http://svn.apache.org/viewvc?rev=1765354&view=rev
Log:
PIG-5041: RoundRobinPartitioner is not deterministic when order of input 
records change (rohini)

Added:
    
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
Modified:
    pig/branches/branch-0.16/CHANGES.txt
    
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
    pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Mon Oct 17 19:08:26 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5041: RoundRobinPartitioner is not deterministic when order of input 
records change (rohini)
+
 PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of 
Random (rohini)
 
 PIG-4951: Rename PIG_ATS_ENABLED constant (szita via daijy)

Modified: 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Mon Oct 17 19:08:26 2016
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -53,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
 import org.apache.pig.builtin.JsonStorage;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -590,7 +590,7 @@ public class UnionOptimizer extends TezO
             // more union predecessors. Change it to SCATTER_GATHER
             if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                 edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.partitionerClass = HashValuePartitioner.class;
                 edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }

Added: 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1765354&view=auto
==============================================================================
--- 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 (added)
+++ 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 Mon Oct 17 19:08:26 2016
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int getPartition(Writable key, Writable value, int numPartitions) {
+        int hash = 17;
+        Tuple tuple;
+        if (value instanceof Tuple) {
+            // union optimizer turned off
+            tuple = (Tuple) value;
+        } else {
+            // union followed by order by or skewed join
+            tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+        }
+        if (tuple != null) {
+            for (Object o : tuple.getAll()) {
+                if (o != null) {
+                    // Skip computing hashcode for bags.
+                    // Order of elements in the map/bag may be different on 
each run
+                    if (o instanceof DataBag) {
+                        hash = 31 * hash;
+                    } else if (o instanceof Map) {
+                        // Including size of map as it is easily available
+                        // Not doing for DataBag as some implementations 
actually
+                        // iterate through all elements in the bag to get the 
size.
+                        hash = 31 * hash + ((Map) o).size();
+                    } else {
+                        hash = 31 * hash + o.hashCode();
+                    }
+                }
+            }
+        }
+        return (hash & Integer.MAX_VALUE) % numPartitions;
+    }
+
+}
\ No newline at end of file

Modified: 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Mon Oct 17 19:08:26 2016
@@ -40,9 +40,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -269,7 +269,7 @@ public class TezCompilerUtil {
         } else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
             edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
             edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            edge.partitionerClass = HashValuePartitioner.class;
         }
         
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);

Modified: 
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- 
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java 
(original)
+++ 
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java 
Mon Oct 17 19:08:26 2016
@@ -22,6 +22,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most 
cases.
+ */
+@Deprecated
 public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
         implements Configurable {
 

Modified: 
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java 
(original)
+++ pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java 
Mon Oct 17 19:08:26 2016
@@ -25,7 +25,6 @@ import java.io.IOException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigServer;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.test.Util;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -39,12 +38,16 @@ public class TestTezJobExecution {
 
     private static final String TEST_DIR = 
Util.getTestDirectory(TestTezJobExecution.class);
 
+    private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + 
"input";
     private PigServer pigServer;
 
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         Util.deleteDirectory(new File(TEST_DIR));
         new File(TEST_DIR).mkdirs();
+        Util.createLocalInputFile(INPUT_FILE, new String[] {
+            "1", "1", "1", "2", "2", "2"
+        });
     }
 
     @AfterClass
@@ -54,27 +57,21 @@ public class TestTezJobExecution {
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(Util.getLocalTestMode());
+        pigServer = new PigServer("tez_local");
     }
 
     @Test
-    public void testUnionParallelRoundRobinBatchSize() throws IOException {
-        String input = TEST_DIR + Path.SEPARATOR + "input1";
+    public void testUnionParallelHashValuePartition() throws IOException {
         String output = TEST_DIR + Path.SEPARATOR + "output1";
-        Util.createInputFile(pigServer.getPigContext(), input, new String[] {
-            "1", "1", "1", "2", "2", "2"
-        });
-        String query = "A = LOAD '" + input + "';"
-                + "B = LOAD '" + input + "';"
+        String query = "A = LOAD '" + INPUT_FILE + "';"
+                + "B = LOAD '" + INPUT_FILE + "';"
                 + "C = UNION A, B PARALLEL 2;"
                 + "STORE C into '" + output + "';";
-        pigServer.getPigContext().getProperties().setProperty(
-                RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 
"3");
         pigServer.registerQuery(query);
         String part0 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00000"));
         String part1 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00001"));
-        assertEquals("1\n1\n1\n1\n1\n1\n", part0);
-        assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+        assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+        assertEquals("1\n1\n1\n1\n1\n1\n", part1);
     }
 
 }


Reply via email to