Author: rohini
Date: Thu Oct 15 18:47:10 2015
New Revision: 1708865
URL: http://svn.apache.org/viewvc?rev=1708865&view=rev
Log:
PIG-4702: Load once for sampling and partitioning in order by for certain
LoadFuncs (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/pig-default.properties
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 15 18:47:10 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4702: Load once for sampling and partitioning in order by for certain
LoadFuncs (rohini)
+
PIG-4699: Print Job stats information in Tez like mapreduce (rohini)
PIG-4554: Compress pig.script before encoding (sandyridgeracer via rohini)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Oct 15 18:47:10 2015
@@ -615,4 +615,11 @@ hcat.bin=/usr/local/hcat/bin/hcat
# that are known to work with multiple vertices writing to same location
instead of a blacklist
#pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
-#pig.tez.opt.union.supported.storefuncs=
\ No newline at end of file
+#pig.tez.opt.union.supported.storefuncs=
+
+
+# Pig only reads once from datasource for LoadFuncs specified here during sort
instead of
+# loading once for sampling and loading again for partitioning.
+# Used to avoid hitting external non-filesystem datasources like HBase and
Accumulo twice.
+
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Oct 15 18:47:10 2015
@@ -70,6 +70,14 @@ public class PigConfiguration {
public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS =
"pig.tez.opt.union.unsupported.storefuncs";
/**
+ * Pig only reads once from datasource for LoadFuncs specified here during
sort instead of
+ * loading once for sampling and loading again for partitioning.
+ * Used to avoid hitting external non-filesystem datasources like HBase
and Accumulo twice.
+ * Honored only by Pig on Tez now.
+ */
+ public static final String PIG_SORT_READONCE_LOADFUNCS =
"pig.sort.readonce.loadfuncs";
+
+ /**
* Boolean value to enable or disable partial aggregation in map. Disabled
by default
*/
public static final String PIG_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Thu Oct 15 18:47:10 2015
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -34,6 +35,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
@@ -171,6 +173,7 @@ public class TezCompiler extends PhyPlan
private int fileConcatenationThreshold = 100;
private boolean optimisticFileConcatenation = false;
+ private List<String> readOnceLoadFuncs = null;
private POLocalRearrangeTezFactory localRearrangeFactory;
@@ -201,6 +204,12 @@ public class TezCompiler extends PhyPlan
OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
LOG.info("File concatenation threshold: " + fileConcatenationThreshold
+ " optimistic? " + optimisticFileConcatenation);
+
+ String loadFuncs = pigContext.getProperties().getProperty(
+ PigConfiguration.PIG_SORT_READONCE_LOADFUNCS);
+ if (loadFuncs != null && loadFuncs.trim().length() > 0) {
+ readOnceLoadFuncs =
Arrays.asList(StringUtils.split(loadFuncs.trim()));
+ }
}
public TezOperPlan getTezPlan() {
@@ -1755,8 +1764,16 @@ public class TezCompiler extends PhyPlan
boolean writeDataForPartitioner = false;
if (samplerOper.plan.getRoots().get(0) instanceof POLoad) {
for (PhysicalOperator oper : samplerOper.plan) {
- if (oper instanceof POForEach || oper instanceof POLoad) {
+ if (oper instanceof POForEach) {
continue;
+ } else if (oper instanceof POLoad && oper.getInputs() == null)
{
+ // TODO: oper.getInputs() is not null in case of PONative
and
+ // clone needs to be fixed in that case. e2e test -
Native_2.
+ String loadFunc = ((POLoad)
oper).getLoadFunc().getClass().getName();
+ // We do not want to read all data again from
hbase/accumulo for sampling.
+ if (readOnceLoadFuncs == null ||
!readOnceLoadFuncs.contains(loadFunc)) {
+ continue;
+ }
}
writeDataForPartitioner = true;
break;
Modified: pig/trunk/src/pig-default.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Thu Oct 15 18:47:10 2015
@@ -60,3 +60,5 @@ pig.stats.output.size.reader=org.apache.
pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
+
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld?rev=1708865&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld
Thu Oct 15 18:47:10 2015
@@ -0,0 +1,73 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-11 -> Tez vertex scope-20,Tez vertex scope-30,
+Tez vertex scope-20 -> Tez vertex scope-30,
+Tez vertex scope-30 -> Tez vertex scope-32,
+Tez vertex scope-32
+
+Tez vertex scope-11
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-14 -> scope-20
+| |
+| Constant(DummyVal) - scope-13
+|
+|---New For Each(false,true)[tuple] - scope-19
+ | |
+ | Project[int][0] - scope-8
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-18
+ | |
+ | |---Project[tuple][*] - scope-17
+ |
+ |---ReservoirSample - scope-16
+ |
+ |---b: Local Rearrange[tuple]{int}(false) - scope-12 ->
scope-30
+ | |
+ | Project[int][0] - scope-8
+ |
+ |---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a:
Load(file:///tmp/input:org.apache.pig.backend.hadoop.hbase.HBaseStorage(',')) -
scope-0
+Tez vertex scope-20
+# Plan on vertex
+POValueOutputTez - scope-29 -> [scope-30]
+|
+|---New For Each(false)[tuple] - scope-28
+ | |
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-27
+ | |
+ | |---Project[tuple][*] - scope-26
+ |
+ |---New For Each(false,false)[tuple] - scope-25
+ | |
+ | Constant(-1) - scope-24
+ | |
+ | Project[bag][1] - scope-22
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-21
+Tez vertex scope-30
+# Plan on vertex
+POIdentityInOutTez - scope-31 <- scope-11 -> scope-32
+| |
+| Project[int][0] - scope-8
+Tez vertex scope-32
+# Plan on vertex
+b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
+|
+|---New For Each(true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-34
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-33
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1708865&r1=1708864&r2=1708865&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Thu Oct 15 18:47:10
2015
@@ -339,6 +339,18 @@ public class TestTezCompiler {
run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
}
+ @Test
+ public void testOrderByReadOnceLoadFunc() throws Exception {
+
setProperty("pig.sort.readonce.loadfuncs","org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage");
+ String query =
+ "a = load 'file:///tmp/input' using
org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" +
+ "b = order a by x;" +
+ "STORE b INTO 'file:///tmp/output';";
+
+ run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld");
+ setProperty("pig.sort.readonce.loadfuncs", null);
+ }
+
// PIG-3759, PIG-3781
// Combiner should not be added in case of co-group
@Test