Author: rohini
Date: Mon Oct 17 19:08:26 2016
New Revision: 1765354
URL: http://svn.apache.org/viewvc?rev=1765354&view=rev
Log:
PIG-5041: RoundRobinPartitioner is not deterministic when order of input
records change (rohini)
Added:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Mon Oct 17 19:08:26 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5041: RoundRobinPartitioner is not deterministic when order of input
records change (rohini)
+
PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of
Random (rohini)
PIG-4951: Rename PIG_ATS_ENABLED constant (szita via daijy)
Modified:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Mon Oct 17 19:08:26 2016
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -53,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
import org.apache.pig.builtin.JsonStorage;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -590,7 +590,7 @@ public class UnionOptimizer extends TezO
// more union predecessors. Change it to SCATTER_GATHER
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
edge.outputClassName =
UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
}
Added:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1765354&view=auto
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
(added)
+++
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
Mon Oct 17 19:08:26 2016
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public int getPartition(Writable key, Writable value, int numPartitions) {
+ int hash = 17;
+ Tuple tuple;
+ if (value instanceof Tuple) {
+ // union optimizer turned off
+ tuple = (Tuple) value;
+ } else {
+ // union followed by order by or skewed join
+ tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+ }
+ if (tuple != null) {
+ for (Object o : tuple.getAll()) {
+ if (o != null) {
+ // Skip computing hashcode for bags.
+ // Order of elements in the map/bag may be different on
each run
+ if (o instanceof DataBag) {
+ hash = 31 * hash;
+ } else if (o instanceof Map) {
+ // Including size of map as it is easily available
+ // Not doing for DataBag as some implementations
actually
+ // iterate through all elements in the bag to get the
size.
+ hash = 31 * hash + ((Map) o).size();
+ } else {
+ hash = 31 * hash + o.hashCode();
+ }
+ }
+ }
+ }
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+
+}
\ No newline at end of file
Modified:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
Mon Oct 17 19:08:26 2016
@@ -40,9 +40,9 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -269,7 +269,7 @@ public class TezCompilerUtil {
} else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
edge.outputClassName =
UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
}
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
Modified:
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/builtin/RoundRobinPartitioner.java
Mon Oct 17 19:08:26 2016
@@ -22,6 +22,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most
cases.
+ */
+@Deprecated
public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
implements Configurable {
Modified:
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1765354&r1=1765353&r2=1765354&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java
(original)
+++ pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobExecution.java
Mon Oct 17 19:08:26 2016
@@ -25,7 +25,6 @@ import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigServer;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.test.Util;
import org.junit.AfterClass;
import org.junit.Before;
@@ -39,12 +38,16 @@ public class TestTezJobExecution {
private static final String TEST_DIR =
Util.getTestDirectory(TestTezJobExecution.class);
+ private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR +
"input";
private PigServer pigServer;
@BeforeClass
public static void oneTimeSetUp() throws Exception {
Util.deleteDirectory(new File(TEST_DIR));
new File(TEST_DIR).mkdirs();
+ Util.createLocalInputFile(INPUT_FILE, new String[] {
+ "1", "1", "1", "2", "2", "2"
+ });
}
@AfterClass
@@ -54,27 +57,21 @@ public class TestTezJobExecution {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(Util.getLocalTestMode());
+ pigServer = new PigServer("tez_local");
}
@Test
- public void testUnionParallelRoundRobinBatchSize() throws IOException {
- String input = TEST_DIR + Path.SEPARATOR + "input1";
+ public void testUnionParallelHashValuePartition() throws IOException {
String output = TEST_DIR + Path.SEPARATOR + "output1";
- Util.createInputFile(pigServer.getPigContext(), input, new String[] {
- "1", "1", "1", "2", "2", "2"
- });
- String query = "A = LOAD '" + input + "';"
- + "B = LOAD '" + input + "';"
+ String query = "A = LOAD '" + INPUT_FILE + "';"
+ + "B = LOAD '" + INPUT_FILE + "';"
+ "C = UNION A, B PARALLEL 2;"
+ "STORE C into '" + output + "';";
- pigServer.getPigContext().getProperties().setProperty(
- RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE,
"3");
pigServer.registerQuery(query);
String part0 = FileUtils.readFileToString(new File(output +
Path.SEPARATOR + "part-v002-o000-r-00000"));
String part1 = FileUtils.readFileToString(new File(output +
Path.SEPARATOR + "part-v002-o000-r-00001"));
- assertEquals("1\n1\n1\n1\n1\n1\n", part0);
- assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+ assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+ assertEquals("1\n1\n1\n1\n1\n1\n", part1);
}
}