Author: xuefu
Date: Fri Apr 1 20:19:53 2016
New Revision: 1737436
URL: http://svn.apache.org/viewvc?rev=1737436&view=rev
Log:
PIG-4855: Merge trunk[4] into spark branch (Pallavi via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/HKerberos.java
- copied unchanged from r1737432,
pig/trunk/src/org/apache/pig/backend/hadoop/HKerberos.java
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
- copied unchanged from r1737432,
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
Modified:
pig/branches/spark/ (props changed)
pig/branches/spark/CHANGES.txt
pig/branches/spark/conf/pig.properties
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
pig/branches/spark/src/org/apache/pig/PigConfiguration.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
pig/branches/spark/src/org/apache/pig/impl/PigContext.java
pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java
pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java
pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushUpFilter.java
pig/branches/spark/test/org/apache/pig/test/TestPigStorage.java
pig/branches/spark/test/org/apache/pig/test/Util.java
pig/branches/spark/test/org/apache/pig/test/data/DotFiles/explain1.dot
pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
Propchange: pig/branches/spark/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 1 20:19:53 2016
@@ -1,2 +1,2 @@
/hadoop/pig/branches/multiquery:741727-770826
-/pig/trunk:1621676-1733612
+/pig/trunk:1621676-1737432
Modified: pig/branches/spark/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/spark/CHANGES.txt?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/CHANGES.txt (original)
+++ pig/branches/spark/CHANGES.txt Fri Apr 1 20:19:53 2016
@@ -24,6 +24,14 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4847: POPartialAgg processing and spill improvements (rohini)
+
+PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of
no vertex groups (rohini)
+
+PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine
plan (rohini)
+
+PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via
daijy)
+
PIG-4817: Bump HTTP Logparser to version 2.4 (nielsbasjes via daijy)
PIG-4811: Upgrade groovy library to address MethodClosure vulnerability
(daijy)
@@ -97,6 +105,18 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4851: Null not padded when input has less fields than declared schema for
some loader (rohini)
+
+PIG-4850: Registered jars do not use submit replication (rdblue via cheolsoo)
+
+PIG-4845: Parallel instantiation of classes in Tez cause tasks to fail (rohini)
+
+PIG-4841: Inline-op with schema declaration fails with syntax error (knoguchi)
+
+PIG-4832: Fix TestPrumeColumn NPE failure (kellyzly via daijy)
+
+PIG-4833 TestBuiltin.testURIWithCurlyBrace in TEZ failing after PIG-4819
(knoguchi)
+
PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)
PIG-4816: Read a null scalar causing a Tez failure (daijy)
Modified: pig/branches/spark/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/branches/spark/conf/pig.properties?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/conf/pig.properties (original)
+++ pig/branches/spark/conf/pig.properties Fri Apr 1 20:19:53 2016
@@ -193,6 +193,18 @@
#
# pig.spill.gc.activation.size=40000000
+# Spill will be triggered if the fraction of Old Generation heap exceeds the
usage or collection threshold.
+# For bigger heap sizes, using a fixed size for collection and usage
thresholds will
+# utilize memory better than a percentage of the heap.
+# So usage threshold is calculated as
+# Max(HeapSize * pig.spill.memory.usage.threshold.fraction, HeapSize -
pig.spill.unused.memory.threshold.size)
+# So collection threshold is calculated as
+# Max(HeapSize * pig.spill.collection.threshold.fraction, HeapSize -
pig.spill.unused.memory.threshold.size)
+
+# pig.spill.memory.usage.threshold.fraction=0.7
+# pig.spill.collection.threshold.fraction=0.7
+# pig.spill.unused.memory.threshold.size=367001600
+
# Maximum amount of data to replicate using the distributed cache when doing
# fragment-replicated join. (default: 1000000000, about 1GB) Consider
increasing
# this in a production environment, but carefully.
@@ -331,6 +343,15 @@
#
# pig.exec.nocombiner=false
+# Enable or disable use of combiners only in reducer shuffle-merge phase.
+# pig.exec.nocombiner turns off combiner for both map and reduce phases.
+# Valid values are auto, true or false. Default is auto in which Pig turns off
combiner
+# on per combine plan basis when bags are present in a particular plan.
+# Value of true or false will apply to all combine plans in the script.
+# Currently this only applies to Tez as Mapreduce does not run combiners in
reducer (MAPREDUCE-5221).
+
+# pig.exec.nocombiner.reducer=auto
+
# EXPERIMENTAL: Aggregate records in map task before sending to the combiner?
# (default: false, 10; recommended: true, 10). In cases where there is a
massive
# reduction of data in the aggregation step, pig can do a first pass of
Modified:
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
(original)
+++
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
Fri Apr 1 20:19:53 2016
@@ -29,17 +29,19 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.apache.pig.test.Util;
+import org.junit.Assert;
import org.junit.Test;
public class TestCSVStorage {
protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
-
+
private PigServer pigServer;
private MiniCluster cluster;
-
+
public TestCSVStorage() throws ExecException, IOException {
cluster = MiniCluster.buildCluster();
pigServer = new PigServer(ExecType.LOCAL, new Properties());
@@ -59,8 +61,8 @@ public class TestCSVStorage {
Iterator<Tuple> it = pigServer.openIterator("a");
assertEquals(Util.createTuple(new String[] {"foo", "bar", "baz"}),
it.next());
}
-
- @Test
+
+ @Test
public void testQuotedCommas() throws IOException {
String inputFileName = "TestCSVLoader-quotedcommas.txt";
Util.createLocalInputFile(inputFileName, new String[]
{"\"foo,bar,baz\"", "fee,foe,fum"});
@@ -71,11 +73,11 @@ public class TestCSVStorage {
assertEquals(Util.createTuple(new String[] {"foo,bar,baz", null,
null}), it.next());
assertEquals(Util.createTuple(new String[] {"fee", "foe", "fum"}),
it.next());
}
-
+
@Test
public void testQuotedQuotes() throws IOException {
String inputFileName = "TestCSVLoader-quotedquotes.txt";
- Util.createLocalInputFile(inputFileName,
+ Util.createLocalInputFile(inputFileName,
new String[] {"\"foo,\"\"bar\"\",baz\"", "\"\"\"\"\"\"\"\""});
String script = "a = load '" + inputFileName + "' using
org.apache.pig.piggybank.storage.CSVLoader() " +
" as (a:chararray); ";
@@ -84,5 +86,21 @@ public class TestCSVStorage {
assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}),
it.next());
assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
}
-
+
+ @Test
+ public void testNullPadding() throws IOException {
+ String inputFileName = "TestCSVLoader-nullpadding.txt";
+ Util.createLocalInputFile(inputFileName, new String[] { "a", "b,",
"c,d", ",e"});
+ String script = "a = load '" + inputFileName + "' using
org.apache.pig.piggybank.storage.CSVLoader() " +
+ " as (field1, field2); dump a;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("a");
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("a"), null}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("b"), null}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("c"), new DataByteArray("d")}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray(""), new DataByteArray("e")}), it.next());
+ Assert.assertFalse(it.hasNext());
+ }
+
+
}
Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri
Apr 1 20:19:53 2016
@@ -325,6 +325,60 @@ $ pig hdfs://nn.mydomain.com:9020/myscri
</section>
<!-- ==================================================================== -->
+ <section id="kerberos">
+ <title>Running jobs on a Kerberos secured cluster</title>
+ <p>Kerberos is a authentication system that uses tickets with a limited
validity time.<br/>
+ As a consequence running a pig script on a kerberos secured hadoop
cluster limits the running time to at most
+ the remaining validity time of these kerberos tickets. When doing
really complex analytics this may become a
+ problem as the job may need to run for a longer time than these ticket
times allow.</p>
+
+ <section id="kerberos-short">
+ <title>Short lived jobs</title>
+ <p>When running short jobs all you need to do is ensure that the user has
been logged in into Kerberos via the
+ normal kinit method.<br/>
+ The Hadoop job will automatically pickup these credentials and the job
will run fine.</p>
+ </section>
+
+ <section id="kerberos-long">
+ <title>Long lived jobs</title>
+ <p>A kerberos keytab file is essentially a Kerberos specific form of the
password of a user. <br/>
+ It is possible to enable a Hadoop job to request new tickets when they
expire by creating a keytab file and
+ make it part of the job that is running in the cluster.
+ This will extend the maximum job duration beyond the maximum renew
time of the kerberos tickets.</p>
+ <p>Usage:</p>
+ <ol>
+ <li>Create a keytab file for the required principal.<br/>
+ Using the ktutil tool you can create a keytab using roughly these
commands:<br/>
+ <source>addent -password -p [email protected] -k 1 -e rc4-hmac
+addent -password -p [email protected] -k 1 -e aes256-cts
+wkt niels.keytab</source>
+ </li>
+ <li>Set the following properties (either via the .pigrc file or on the
command line via -P file)
+ <ul>
+ <li><em>java.security.krb5.conf</em><br/>
+ The path to the local krb5.conf file.<br/>
+ Usually this is "/etc/krb5.conf"</li>
+ <li><em>hadoop.security.krb5.principal</em><br/>
+ The pricipal you want to login with.<br/>
+ Usually this would look like this "[email protected]"</li>
+ <li><em>hadoop.security.krb5.keytab</em><br/>
+ The path to the local keytab file that must be used to
authenticate with.<br/>
+ Usually this would look like this
"/home/niels/.krb/niels.keytab"</li>
+ </ul></li>
+ </ol>
+ <p><strong>NOTE:</strong>All paths in these variables are local to the
client system starting the actual pig script.
+ This can be run without any special access to the cluster nodes.</p>
+
+ <p>Overall you would create a file that looks like this (assume we
call it niels.kerberos.properties):</p>
+ <source>java.security.krb5.conf=/etc/krb5.conf
[email protected]
+hadoop.security.krb5.keytab=/home/niels/.krb/niels.keytab</source>
+ <p>and start your script like this:</p>
+ <source>pig -P niels.kerberos.properties script.pig</source>
+ </section>
+ </section>
+
+ <!-- ==================================================================== -->
<!-- PIG LATIN STATEMENTS -->
<section id="pl-statements">
Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Apr 1
20:19:53 2016
@@ -94,6 +94,13 @@ public class PigConfiguration {
public static final String PIG_EXEC_NO_COMBINER = "pig.exec.nocombiner";
/**
+ * Enable or disable use of combiners in reducer shuffle-merge phase in
Tez.
+ * Valid values are auto, true or false.
+ * Default is auto which turns off combiner if bags are present in the
combine plan
+ */
+ public static final String PIG_EXEC_NO_COMBINER_REDUCER =
"pig.exec.nocombiner.reducer";
+
+ /**
* This key controls whether secondary sort key is used for optimization
in case
* of nested distinct or sort
*/
@@ -328,7 +335,7 @@ public class PigConfiguration {
* Set the threshold for percentage of errors
*/
public static final String PIG_ERROR_THRESHOLD_PERCENT =
"pig.error.threshold.percent";
-
+
/**
* Comma-delimited entries of commands/operators that must be disallowed.
* This is a security feature to be used by administrators to block use of
@@ -380,6 +387,29 @@ public class PigConfiguration {
public static final String PIG_TEZ_DAG_STATUS_REPORT_INTERVAL =
"pig.tez.dag.status.report.interval";
+ // SpillableMemoryManager settings
+
+ /**
+ * Spill will be triggered if the fraction of biggest heap exceeds the
usage threshold.
+ * If {@link PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE} is
non-zero, then usage threshold is calculated as
+ * Max(HeapSize * PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, HeapSize -
PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE)
+ * Default is 0.7
+ */
+ public static final String PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION =
"pig.spill.memory.usage.threshold.fraction";
+
+ /**
+ * Spill will be triggered if the fraction of biggest heap exceeds the
collection threshold.
+ * If {@link PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE} is
non-zero, then collection threshold is calculated as
+ * Max(HeapSize * PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, HeapSize -
PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE)
+ * Default is 0.7
+ */
+ public static final String PIG_SPILL_COLLECTION_THRESHOLD_FRACTION =
"pig.spill.collection.threshold.fraction";
+
+ /**
+ * Spill will be triggered when unused memory falls below the threshold.
+ * Default is 350MB
+ */
+ public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE =
"pig.spill.unused.memory.threshold.size";
// Deprecated settings of Pig 0.13
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
Fri Apr 1 20:19:53 2016
@@ -37,6 +37,7 @@ import org.apache.pig.backend.BackendExc
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.HKerberos;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
@@ -215,6 +216,9 @@ public abstract class HExecutionEngine i
// the properties
Utils.recomputeProperties(jc, properties);
+ // Ensure we have been logged in using the kerberos keytab (if
provided) before continuing.
+ HKerberos.tryKerberosKeytabLogin(jc);
+
cluster = jc.get(MRConfiguration.JOB_TRACKER);
nameNode = jc.get(FILE_SYSTEM_LOCATION);
if (nameNode == null) {
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Apr 1 20:19:53 2016
@@ -1783,6 +1783,7 @@ public class JobControlCompiler{
PigContext pigContext,
Configuration conf,
URL url) throws IOException {
+ short replication = (short)
conf.getInt(MRConfiguration.SUMIT_REPLICATION, 3);
boolean cacheEnabled =
conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED,
false);
@@ -1809,6 +1810,7 @@ public class JobControlCompiler{
os.close();
}
}
+ fs.setReplication(dst, replication);
return dst;
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
Fri Apr 1 20:19:53 2016
@@ -33,7 +33,6 @@ import org.apache.log4j.PropertyConfigur
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -161,7 +160,7 @@ public abstract class PigGenericMapBase
super.setup(context);
Configuration job = context.getConfiguration();
- SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+ SpillableMemoryManager.getInstance().configure(job);
context.getConfiguration().set(PigConstants.TASK_INDEX,
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Fri Apr 1 20:19:53 2016
@@ -34,7 +34,6 @@ import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
if (inIllustrator)
pack = getPack(context);
Configuration jConf = context.getConfiguration();
-
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+ SpillableMemoryManager.getInstance().configure(jConf);
context.getConfiguration().set(PigConstants.TASK_INDEX,
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
sJobContext = context;
sJobConfInternal.set(context.getConfiguration());
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Fri Apr 1 20:19:53 2016
@@ -491,6 +491,11 @@ public class POUserFunc extends Expressi
}
}
+ public Type getOriginalReturnType() throws ExecException {
+ instantiateFunc(origFSpec);
+ return func.getReturnType();
+ }
+
public Type getReturnType() {
return func.getReturnType();
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Fri Apr 1 20:19:53 2016
@@ -141,7 +141,7 @@ public class POPartialAgg extends Physic
}
private void init() throws ExecException {
- ALL_POPARTS.put(this, null);
+
numRecsInRawMap = 0;
numRecsInProcessedMap = 0;
minOutputReduction = DEFAULT_MIN_REDUCTION;
@@ -172,6 +172,9 @@ public class POPartialAgg extends Physic
// Set them to true instead of adding another check for
!disableMapAgg
sizeReductionChecked = true;
estimatedMemThresholds = true;
+ } else {
+ ALL_POPARTS.put(this, null);
+ SpillableMemoryManager.getInstance().registerSpillable(this);
}
// Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
// newHashMapWithExpectedSize does new HashMap(expectedSize +
expectedSize/3)
@@ -187,7 +190,6 @@ public class POPartialAgg extends Physic
listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
}
initialized = true;
- SpillableMemoryManager.getInstance().registerSpillable(this);
}
@Override
@@ -203,7 +205,7 @@ public class POPartialAgg extends Physic
// The fact that we are in the latter stage is communicated via the
doSpill
// flag.
- if (!initialized && !ALL_POPARTS.containsKey(this)) {
+ if (!initialized) {
init();
}
@@ -224,8 +226,12 @@ public class POPartialAgg extends Physic
if (doSpill == false) {
// SpillableMemoryManager requested a spill to reduce
memory
// consumption. See if we can avoid it.
+ // If we see that there are way more records in processed
map than
+ // raw map, it is better to spill after the aggregation
+ boolean spillProcessedMap = (3 * numRecsInRawMap) <
numRecsInProcessedMap;
aggregateBothLevels(false, false);
- if (shouldSpill()) {
+ // Spill if the numRecsInProcessedMap
+ if (spillProcessedMap || shouldSpill()) {
startSpill(false);
} else {
LOG.info("Avoided emitting records during spill memory
call.");
@@ -360,6 +366,7 @@ public class POPartialAgg extends Physic
// called and size reduction checked
startSpill(false);
disableMapAgg = true;
+ ALL_POPARTS.remove(this);
}
private boolean mapAggDisabled() {
@@ -379,19 +386,21 @@ public class POPartialAgg extends Physic
return shouldAggregateSecondLevel();
}
+ private void addKeySingleValToMap(Map<Object, List<Tuple>> map,
+ Object key, List<Tuple> inpTuple) throws ExecException {
+ List<Tuple> value = map.get(key);
+ if (value == null) {
+ map.put(key, inpTuple);
+ } else {
+ value.add(inpTuple.get(0));
+ }
+ }
+
private void addKeyValToMap(Map<Object, List<Tuple>> map,
Object key, Tuple inpTuple) throws ExecException {
List<Tuple> value = map.get(key);
if (value == null) {
- if (isGroupAll) {
- // Set exact array initial size to avoid array copies
- // listSizeThreshold = numRecordsToSample before estimating
memory
- // thresholds and firstTierThreshold after memory estimation
- int listSize = (map == rawInputMap) ? listSizeThreshold :
Math.min(secondTierThreshold, MAX_LIST_SIZE);
- value = new ArrayList<Tuple>(listSize);
- } else {
- value = new ArrayList<Tuple>();
- }
+ value = createNewValueList(map);
map.put(key, value);
}
value.add(inpTuple);
@@ -409,6 +418,20 @@ public class POPartialAgg extends Physic
}
}
+ private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
+ List<Tuple> value;
+ if (isGroupAll) {
+ // Set exact array initial size to avoid array copies
+ // listSizeThreshold = numRecordsToSample before estimating memory
+ // thresholds and firstTierThreshold after memory estimation
+ int listSize = (map == rawInputMap) ? listSizeThreshold :
Math.min(secondTierThreshold + 1, MAX_LIST_SIZE);
+ value = new ArrayList<Tuple>(listSize);
+ } else {
+ value = new ArrayList<Tuple>();
+ }
+ return value;
+ }
+
private void startSpill(boolean aggregate) throws ExecException {
// If spillingIterator is null, we are already spilling and don't need
to set up.
if (spillingIterator != null) return;
@@ -452,13 +475,38 @@ public class POPartialAgg extends Physic
* @throws ExecException
*/
private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object,
List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+ boolean srcDestDifferent = (fromMap == toMap) ? false : true;
Iterator<Map.Entry<Object, List<Tuple>>> iter =
fromMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Object, List<Tuple>> entry = iter.next();
- Tuple valueTuple = createValueTuple(entry.getKey(),
entry.getValue());
- Result res = getOutput(entry.getKey(), valueTuple);
- iter.remove();
- addKeyValToMap(toMap, entry.getKey(),
getAggResultTuple(res.result));
+ if (entry.getValue().size() == 1) {
+ // If fromMap and toMap are same (processedInputMap), then
continue without any change
+ // If different (rawInputMap->processedInputMap), add directly
to target and skip running through the valuePlans
+ if (srcDestDifferent) {
+ iter.remove();
+ addKeySingleValToMap(toMap, entry.getKey(),
entry.getValue());
+ }
+ } else {
+ Tuple valueTuple = createValueTuple(entry.getKey(),
entry.getValue());
+ Result res = getOutput(entry.getKey(), valueTuple);
+ if (srcDestDifferent) {
+ // Remove from src and add to destination
(rawInputMap->processedInputMap)
+ iter.remove();
+ addKeyValToMap(toMap, entry.getKey(),
getAggResultTuple(res.result));
+ } else {
+ // Update processedInputMap in place
+ List<Tuple> value = entry.getValue();
+ if (isGroupAll) {
+ value.clear();
+ } else {
+ // Creating a new list to free more space instead of
+ // calling clear as the same key might not repeat again
+ value = createNewValueList(toMap);
+ toMap.put(entry.getKey(), value);
+ }
+ value.add(getAggResultTuple(res.result));
+ }
+ }
numEntriesInTarget++;
}
return numEntriesInTarget;
@@ -495,9 +543,7 @@ public class POPartialAgg extends Physic
return;
}
int processedTuples = numRecsInProcessedMap;
- Map<Object, List<Tuple>> newMap =
Maps.newHashMapWithExpectedSize(processedInputMap.size());
- numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
- processedInputMap = newMap;
+ numRecsInProcessedMap = aggregate(processedInputMap,
processedInputMap, 0);
LOG.info("Aggregated " + processedTuples + " processed tuples to " +
numRecsInProcessedMap + " tuples");
}
@@ -643,14 +689,13 @@ public class POPartialAgg extends Physic
LOG.info("Spill triggered by SpillableMemoryManager, but
previous spill call is still not processed. Skipping");
return 0;
}
- LOG.info("Spill triggered by SpillableMemoryManager");
synchronized(spillLock) {
if (rawInputMap != null) {
- LOG.info("Memory usage: " + getMemorySize()
- + ". Raw map: num keys = " + rawInputMap.size()
- + ", num tuples = "+ numRecsInRawMap
- + ", Processed map: num keys = " +
processedInputMap.size()
- + ", num tuples = "+ numRecsInProcessedMap );
+ LOG.info("Spill triggered. Memory usage: " +
getMemorySize()
+ + ". Raw map: keys = " + rawInputMap.size()
+ + ", tuples = "+ numRecsInRawMap
+ + ", Processed map: keys = " +
processedInputMap.size()
+ + ", tuples = "+ numRecsInProcessedMap );
}
startedContingentSpill = false;
doContingentSpill = true;
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Fri Apr 1 20:19:53 2016
@@ -103,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -507,8 +508,28 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
- in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+ out.setUserPayload(payLoad);
+
+ if (!combinePlan.isEmpty()) {
+ boolean noCombineInReducer = false;
+ String reducerNoCombiner =
globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
+ if (reducerNoCombiner == null || reducerNoCombiner.equals("auto"))
{
+ noCombineInReducer =
TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
+ } else {
+ noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
+ }
+ if (noCombineInReducer) {
+ log.info("Turning off combiner in reducer vertex " +
to.getOperatorKey() + " for edge from " + from.getOperatorKey());
+ conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+ conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
+ conf.unset("pig.combinePlan");
+ conf.unset("pig.combine.package");
+ conf.unset("pig.map.keytype");
+ payLoad = TezUtils.createUserPayloadFromConf(conf);
+ }
+ }
+ in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST &&
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 &&
(to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
Fri Apr 1 20:19:53 2016
@@ -55,6 +55,14 @@ public class TezResourceManager {
return instance;
}
+
+ /**
+ * This method is only used by test code to reset state.
+ */
+ public static void dropInstance() {
+ instance = null;
+ }
+
public void init(PigContext pigContext, Configuration conf) throws
IOException {
if (!inited) {
this.resourcesDir =
FileLocalizer.getTemporaryResourcePath(pigContext);
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Fri Apr 1 20:19:53 2016
@@ -18,8 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -68,10 +70,23 @@ public class MultiQueryOptimizerTez exte
return;
}
+ // Using a list instead of set to have consistently ordered plans
List<TezOperator> splittees = new ArrayList<TezOperator>();
Set<TezOperator> mergedNonPackageInputSuccessors = new
HashSet<TezOperator>();
+ // When there is a union successor with unsupported storefunc,
those splittees
+ // can only be merged into the split if all the union members will
be from the split
+ // This is to ensure that there are no vertex groups created with
unsupported storefunc.
+ Map<TezOperator, Set<OperatorKey>> tentativeMergeUnionMembers =
new HashMap<TezOperator, Set<OperatorKey>>();
+
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+
+ Set<OperatorKey> splitterAndSuccessorKeys = new
HashSet<OperatorKey>();
+ splitterAndSuccessorKeys.add(tezOp.getOperatorKey());
+ for (TezOperator successor : successors) {
+ splitterAndSuccessorKeys.add(successor.getOperatorKey());
+ }
+
for (TezOperator successor : successors) {
List<TezOperator> predecessors = new
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
predecessors.remove(tezOp);
@@ -129,6 +144,7 @@ public class MultiQueryOptimizerTez exte
// Only in case of POPackage(POShuffleTezLoad) multiple inputs
can be handled from a Split
Set<TezOperator> nonPackageInputSuccessors = new
HashSet<TezOperator>();
boolean canMerge = true;
+ Set<TezOperator> successorUnsupportedStoreUnions = new
HashSet<TezOperator>();
mergedSuccessors.addAll(successors);
for (TezOperator splittee : splittees) {
@@ -140,11 +156,21 @@ public class MultiQueryOptimizerTez exte
for (TezOperator succSuccessor :
getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn &&
- UnionOptimizer.isOptimizable(succSuccessor,
- unionSupportedStoreFuncs,
- unionUnsupportedStoreFuncs))) {
+
UnionOptimizer.isOptimizable(succSuccessor))) {
toNotMergeSuccessors.add(succSuccessor);
} else {
+ if (unionOptimizerOn &&
!UnionOptimizer.isOptimizableStoreFunc(succSuccessor,unionSupportedStoreFuncs,unionUnsupportedStoreFuncs))
{
+ // This optimization of using
UnionOptimizer for unsupported storefunc
+ // is only good for one level of split and
does not handle multiple level of split.
+ Set<OperatorKey> unionMembers = new
HashSet<OperatorKey>(succSuccessor.getUnionMembers());
+
unionMembers.removeAll(splitterAndSuccessorKeys);
+ if(unionMembers.isEmpty()) {
+
successorUnsupportedStoreUnions.add(succSuccessor);
+ } else {
+
toNotMergeSuccessors.add(succSuccessor);
+ continue;
+ }
+ }
toMergeSuccessors.add(succSuccessor);
List<TezOperator> unionSuccessors =
getPlan().getSuccessors(succSuccessor);
if (unionSuccessors != null) {
@@ -187,11 +213,47 @@ public class MultiQueryOptimizerTez exte
mergedSuccessors.retainAll(toNotMergeSuccessors);
if (mergedSuccessors.isEmpty()) { // no shared edge after merge
- splittees.add(successor);
mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
+ if (successorUnsupportedStoreUnions.isEmpty()) {
+ splittees.add(successor);
+ } else {
+ // If all other conditions were satisfied, but it had
a successor union
+ // with unsupported storefunc keep it in the tentative
list
+ for (TezOperator unionOp :
successorUnsupportedStoreUnions) {
+ Set<OperatorKey> tentativeSuccessors =
tentativeMergeUnionMembers.get(unionOp);
+ if (tentativeSuccessors == null) {
+ tentativeSuccessors = new
HashSet<OperatorKey>();
+ tentativeMergeUnionMembers.put(unionOp,
tentativeSuccessors);
+ }
+
tentativeSuccessors.add(successor.getOperatorKey());
+ }
+ }
}
}
+ Set<TezOperator> spliteesToRemove = new HashSet<TezOperator>();
+
+ for (Entry<TezOperator, Set<OperatorKey>> entry :
tentativeMergeUnionMembers.entrySet()) {
+ Set<OperatorKey> unionMembers = new
HashSet<OperatorKey>(entry.getKey().getUnionMembers());
+ if (entry.getValue().containsAll(unionMembers)) {
+ // If all the union members were tentative splittees then
add them
+ for (OperatorKey key : entry.getValue()) {
+ TezOperator splittee = getPlan().getOperator(key);
+ if (!splittees.contains(splittee)) {
+ splittees.add(splittee);
+ }
+ }
+ } else {
+ for (OperatorKey key : entry.getValue()) {
+ spliteesToRemove.add(getPlan().getOperator(key));
+ }
+ }
+ }
+
+ for (TezOperator op : spliteesToRemove) {
+ splittees.remove(op);
+ }
+
if (splittees.size() == 0) {
return;
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Fri Apr 1 20:19:53 2016
@@ -104,9 +104,7 @@ public class UnionOptimizer extends TezO
this.unsupportedStoreFuncs = unsupportedStoreFuncs;
}
- public static boolean isOptimizable(TezOperator tezOp,
- List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs)
- throws VisitorException {
+ public static boolean isOptimizable(TezOperator tezOp) throws
VisitorException {
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) &&
tezOp.getRequestedParallelism() == 1) {
return false;
}
@@ -116,6 +114,12 @@ public class UnionOptimizer extends TezO
if (tezOp.isRankCounter()) {
return false;
}
+ return true;
+ }
+
+ public static boolean isOptimizableStoreFunc(TezOperator tezOp,
+ List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs)
+ throws VisitorException {
if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
List<POStoreTez> stores =
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
for (POStoreTez store : stores) {
@@ -148,7 +152,7 @@ public class UnionOptimizer extends TezO
return;
}
- if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs))
{
+ if (!isOptimizable(tezOp)) {
return;
}
@@ -156,34 +160,40 @@ public class UnionOptimizer extends TezO
String scope = unionOp.getOperatorKey().scope;
PhysicalPlan unionOpPlan = unionOp.plan;
- // TODO: PIG-3856 Handle replicated join and skewed join sample.
- // Replicate join small table/skewed join sample that was broadcast to
union vertex
- // now needs to be broadcast to all the union predecessors. How do we
do that??
- // Wait for shared edge and do it or write multiple times??
- // For now don't optimize except in the case of Split where we need to
write only once
-
Set<OperatorKey> uniqueUnionMembers = new
HashSet<OperatorKey>(unionOp.getUnionMembers());
List<TezOperator> predecessors = new
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null
? null
: new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+ if (uniqueUnionMembers.size() != 1) {
+
+ if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs,
unsupportedStoreFuncs)) {
+ return;
+ }
- if (successors != null && uniqueUnionMembers.size() > 1) {
- for (TezOperator succ : successors) {
- for (TezOperator pred : predecessors) {
- if (succ.inEdges.containsKey(pred.getOperatorKey())) {
- // Stop here, we cannot convert the node into vertex
group
- // Otherwise, we will end up with a parallel edge
between pred
- // and succ
- return;
+ if (successors != null) {
+ for (TezOperator succ : successors) {
+ for (TezOperator pred : predecessors) {
+ if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+ // Stop here, we cannot convert the node into
vertex group
+ // Otherwise, we will end up with a parallel edge
between pred
+ // and succ
+ return;
+ }
}
}
}
+
+ // TODO: PIG-3856 Handle replicated join and skewed join sample.
+ // Replicate join small table/skewed join sample that was
broadcast to union vertex
+ // now needs to be broadcast to all the union predecessors. How do
we do that??
+ // Wait for shared edge and do it or write multiple times??
+ // For now don't optimize except in the case of Split where we
need to write only once
+ if (predecessors.size() > unionOp.getUnionMembers().size()) {
+ return;
+ }
}
- if (predecessors.size() > unionOp.getUnionMembers().size()
- && uniqueUnionMembers.size() != 1) {
- return; // TODO: PIG-3856
- }
+
if (uniqueUnionMembers.size() == 1) {
// We actually don't need VertexGroup in this case. The multiple
// sub-plans of Split can write to same MROutput or the Tez
LogicalOutput
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Fri Apr 1 20:19:53 2016
@@ -56,6 +56,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -128,6 +129,7 @@ public class PigProcessor extends Abstra
UserPayload payload = getContext().getUserPayload();
conf = TezUtils.createConfFromUserPayload(payload);
+ SpillableMemoryManager.getInstance().configure(conf);
PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
.deserialize(conf.get("udf.import.list")));
PigContext pc = (PigContext)
ObjectSerializer.deserialize(conf.get("pig.pigContext"));
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
Fri Apr 1 20:19:53 2016
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez.util;
import java.io.IOException;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -273,4 +275,28 @@ public class TezCompilerUtil {
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
+ public static boolean bagDataTypeInCombinePlan(PhysicalPlan combinePlan)
throws ExecException {
+ PhysicalOperator lr = combinePlan.getLeaves().get(0);
+ POForEach fe = (POForEach) combinePlan.getPredecessors(lr).get(0);
+
+ // Hack. class.getTypeName() is only available in JDK8
+ Type dataBagType = new TOBAG().getReturnType();
+
+ List<PhysicalPlan> inputPlans = fe.getInputPlans();
+ for (PhysicalPlan inputPlan: inputPlans) {
+ PhysicalOperator leaf = inputPlan.getLeaves().get(0);
+ if (leaf.getResultType() == DataType.BAG) {
+ return true;
+ } else if (leaf instanceof POUserFunc) {
+ POUserFunc func = (POUserFunc) leaf;
+ // Return type of Intermediate func in combiner plan is always
Tuple.
+ // Need to check original or Final EvalFunc return type
+ if (dataBagType.equals(func.getOriginalReturnType())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Apr 1
20:19:53 2016
@@ -73,7 +73,6 @@ import org.apache.pig.bzip2r.Bzip2TextIn
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.CastUtils;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -178,7 +177,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
overwrite.setArgs(1);
overwrite.setArgName("overwrite");
validOptions.addOption(overwrite);
-
+
return validOptions;
}
@@ -225,7 +224,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
if ("true".equalsIgnoreCase(value)) {
overwriteOutput = true;
}
- }
+ }
dontLoadSchema = configuredOptions.hasOption("noschema");
tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -302,31 +301,18 @@ LoadPushDown, LoadMetadata, StoreMetadat
Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {signature});
String serializedSchema = p.getProperty(signature+".schema");
- if (serializedSchema != null) {
- try {
- schema = new
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
- } catch (ParserException e) {
- mLog.error("Unable to parse serialized schema " +
serializedSchema, e);
- // all bets are off - there's no guarantee that we'll
return
- // either the fields in the data or the fields in the
schema
- // the user specified (or required)
- }
+ if (serializedSchema == null) return tup;
+ try {
+ schema = new
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
+ } catch (ParserException e) {
+ mLog.error("Unable to parse serialized schema " +
serializedSchema, e);
+ // all bets are off - there's no guarantee that we'll return
+ // either the fields in the data or the fields in the schema
+ // the user specified (or required)
}
}
- if (schema == null) {
- // if the number of required fields are less than or equal to
- // the number of fields in the data then we're OK as we've already
- // read only the required number of fields into the tuple. If
- // more fields are required than are in the data then we'll pad
- // with nulls:
- int numRequiredColumns = 0;
- for (int i = 0; mRequiredColumns != null && i <
mRequiredColumns.length; i++)
- if(mRequiredColumns[i])
- ++numRequiredColumns;
- for (int i = tup.size();i < numRequiredColumns; ++i)
- tup.append(null);
- } else {
+ if (schema != null) {
ResourceFieldSchema[] fieldSchemas = schema.getFields();
int tupleIdx = 0;
// If some fields have been projected out, the tuple
@@ -338,7 +324,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
if (tupleIdx >= tup.size()) {
tup.append(null);
}
-
+
Object val = null;
if(tup.get(tupleIdx) != null){
byte[] bytes = ((DataByteArray)
tup.get(tupleIdx)).get();
Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri Apr 1
20:19:53 2016
@@ -77,13 +77,13 @@ public class PigContext implements Seria
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(PigContext.class);
+ private static Object instantiationLock = new Object();
public static final String JOB_NAME = "jobName";
public static final String JOB_NAME_PREFIX= "PigLatin";
public static final String JOB_PRIORITY = "jobPriority";
public static final String PIG_CMD_ARGS_REMAINDERS =
"pig.cmd.args.remainders";
-
/* NOTE: we only serialize some of the stuff
*
*(to make it smaller given that it's not all needed on the Hadoop side,
@@ -729,26 +729,41 @@ public class PigContext implements Seria
throw new RuntimeException("Cannot instantiate: " + className,
ioe) ;
}
- try {
- // Do normal instantiation
- if (args != null && args.length > 0) {
- Class paramTypes[] = new Class[args.length];
- for (int i = 0; i < paramTypes.length; i++) {
- paramTypes[i] = String.class;
+ // OptionBuilder is not thread-safe and HBaseStorage, elephantbird
SequenceFileConfig, etc
+ // use them in constructor. This leads to NoSuchMethodException,
UnrecognizedOptionException etc
+ // when processor, inputs and outputs are initialized in parallel in
Tez
+ synchronized (instantiationLock) {
+ try {
+ // Do normal instantiation
+ if (args != null && args.length > 0) {
+ Class paramTypes[] = new Class[args.length];
+ for (int i = 0; i < paramTypes.length; i++) {
+ paramTypes[i] = String.class;
+ }
+ Constructor c = objClass.getConstructor(paramTypes);
+ ret = c.newInstance((Object[])args);
+ } else {
+ ret = objClass.newInstance();
}
- Constructor c = objClass.getConstructor(paramTypes);
- ret = c.newInstance((Object[])args);
- } else {
- ret = objClass.newInstance();
}
- }
- catch(NoSuchMethodException nme) {
- // Second chance. Try with var arg constructor
- try {
- Constructor c = objClass.getConstructor(String[].class);
- Object[] wrappedArgs = new Object[1] ;
- wrappedArgs[0] = args ;
- ret = c.newInstance(wrappedArgs);
+ catch(NoSuchMethodException nme) {
+ // Second chance. Try with var arg constructor
+ try {
+ Constructor c = objClass.getConstructor(String[].class);
+ Object[] wrappedArgs = new Object[1] ;
+ wrappedArgs[0] = args ;
+ ret = c.newInstance(wrappedArgs);
+ }
+ catch(Throwable e){
+ // bad luck
+ StringBuilder sb = new StringBuilder();
+ sb.append("could not instantiate '");
+ sb.append(className);
+ sb.append("' with arguments '");
+ sb.append(Arrays.toString(args));
+ sb.append("'");
+ throw new RuntimeException(sb.toString(), e);
+ }
}
catch(Throwable e){
// bad luck
@@ -760,18 +775,8 @@ public class PigContext implements Seria
sb.append("'");
throw new RuntimeException(sb.toString(), e);
}
+ return ret;
}
- catch(Throwable e){
- // bad luck
- StringBuilder sb = new StringBuilder();
- sb.append("could not instantiate '");
- sb.append(className);
- sb.append("' with arguments '");
- sb.append(Arrays.toString(args));
- sb.append("'");
- throw new RuntimeException(sb.toString(), e);
- }
- return ret;
}
public static Object instantiateFuncFromSpec(String funcSpec) {
Modified:
pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Fri Apr 1 20:19:53 2016
@@ -27,7 +27,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import javax.management.Notification;
import javax.management.NotificationEmitter;
@@ -36,6 +35,8 @@ import javax.management.openmbean.Compos
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
/**
* This class Tracks the tenured pool and a list of Spillable objects. When
memory gets low, this
@@ -50,6 +51,11 @@ public class SpillableMemoryManager impl
private static final Log log =
LogFactory.getLog(SpillableMemoryManager.class);
+ private static final int ONE_GB = 1024 * 1024 * 1024;
+ private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 *
1024;
+ private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
+ private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+
private LinkedList<WeakReference<Spillable>> spillables = new
LinkedList<WeakReference<Spillable>>();
// References to spillables with size
private LinkedList<SpillablePtr> spillablesSR = null;
@@ -68,13 +74,9 @@ public class SpillableMemoryManager impl
// and between GC invocations
private long accumulatedFreeSize = 0L;
- // fraction of biggest heap for which we want to get
- // "memory usage threshold exceeded" notifications
- private double memoryThresholdFraction = 0.7;
-
- // fraction of biggest heap for which we want to get
- // "collection threshold exceeded" notifications
- private double collectionMemoryThresholdFraction = 0.5;
+ private long memoryThresholdSize = 0L;
+
+ private long collectionThresholdSize = 0L;
// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
@@ -89,6 +91,8 @@ public class SpillableMemoryManager impl
private volatile boolean blockRegisterOnSpill = false;
+ private MemoryPoolMXBean tenuredHeap;
+
private static final SpillableMemoryManager manager = new
SpillableMemoryManager();
//@StaticDataCleanup
@@ -100,8 +104,6 @@ public class SpillableMemoryManager impl
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
null, null);
List<MemoryPoolMXBean> mpbeans =
ManagementFactory.getMemoryPoolMXBeans();
- MemoryPoolMXBean tenuredHeap = null;
- long tenuredHeapSize = 0;
long totalSize = 0;
for (MemoryPoolMXBean pool : mpbeans) {
log.debug("Found heap (" + pool.getName() + ") of type " +
pool.getType());
@@ -111,7 +113,6 @@ public class SpillableMemoryManager impl
// CMS Old Gen or "tenured" is the only heap that supports
// setting usage threshold.
if (pool.isUsageThresholdSupported()) {
- tenuredHeapSize = size;
tenuredHeap = pool;
}
}
@@ -120,8 +121,39 @@ public class SpillableMemoryManager impl
if (tenuredHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
- log.debug("Selected heap to monitor (" +
- tenuredHeap.getName() + ")");
+
+ configureMemoryThresholds(MEMORY_THRESHOLD_FRACTION_DEFAULT,
+ COLLECTION_THRESHOLD_FRACTION_DEFAULT,
+ UNUSED_MEMORY_THRESHOLD_DEFAULT);
+
+ }
+
+ /**
+ * Configure thresholds for memory usage/collection threshold exceeded
notifications.
+ * Uses memoryThresholdFraction and collectionMemoryThresholdFraction to
configure thresholds
+ * for heap sizes less than 1GB and unusedMemoryThreshold for bigger heaps.
+ *
+ * @param memoryThresholdFraction
+ * fraction of biggest heap for which we want to get memory
usage
+ * threshold exceeded notifications
+ * @param collectionMemoryThresholdFraction
+ * fraction of biggest heap for which we want to get collection
+ * threshold exceeded notifications
+ * @param unusedMemoryThreshold
+ * Unused memory size below which we want to get notifications
+ */
+ private void configureMemoryThresholds(double memoryThresholdFraction,
double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+ long tenuredHeapSize = tenuredHeap.getUsage().getMax();
+ memoryThresholdSize = (long)(tenuredHeapSize *
memoryThresholdFraction);
+ collectionThresholdSize = (long)(tenuredHeapSize *
collectionMemoryThresholdFraction);
+ if (unusedMemoryThreshold > 0) {
+ // For a 1G heap we will be spilling around ~700MB with 300MB
still unused with default 0.7 threshold
+ // For bigger heaps, we still want to spill when there is 300MB
unused (plus another 50MB for buffer) and not at 70%.
+ // For eg: For 4G we want to start spilling at 3.65GB and not at
2.8GB(70%) for better use of memory
+ long unusedThreshold = tenuredHeapSize - unusedMemoryThreshold;
+ memoryThresholdSize = Math.max(memoryThresholdSize,
unusedThreshold);
+ collectionThresholdSize = Math.max(collectionThresholdSize,
unusedThreshold);
+ }
// we want to set both collection and usage threshold alerts to be
// safe. In some local tests after a point only collection threshold
@@ -133,31 +165,29 @@ public class SpillableMemoryManager impl
/* We set the threshold to be 50% of tenured since that is where
* the GC starts to dominate CPU time according to Sun doc */
- tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize *
collectionMemoryThresholdFraction));
+
tenuredHeap.setCollectionUsageThreshold((long)(collectionThresholdSize));
// we set a higher threshold for usage threshold exceeded notification
// since this is more likely to be effective sooner and we do not
// want to be spilling too soon
- tenuredHeap.setUsageThreshold((long)(tenuredHeapSize *
memoryThresholdFraction));
+
+ tenuredHeap.setUsageThreshold((long)(memoryThresholdSize));
+ log.info("Selected heap (" + tenuredHeap.getName() + ")" + " of size "
+ tenuredHeapSize
+ + " to monitor. collectionUsageThreshold = " +
tenuredHeap.getCollectionUsageThreshold()
+ + ", usageThreshold = " + tenuredHeap.getUsageThreshold() );
}
public static SpillableMemoryManager getInstance() {
return manager;
}
- public static void configure(Properties properties) {
-
- try {
+ public void configure(Configuration conf) {
- spillFileSizeThreshold = Long.parseLong(
- properties.getProperty("pig.spill.size.threshold") ) ;
-
- gcActivationSize = Long.parseLong(
- properties.getProperty("pig.spill.gc.activation.size") ) ;
- }
- catch (NumberFormatException nfe) {
- throw new RuntimeException("Error while converting system
configurations" +
- "spill.size.threshold, spill.gc.activation.size", nfe) ;
- }
+ spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold",
spillFileSizeThreshold);
+ gcActivationSize = conf.getLong("pig.spill.gc.activation.size",
gcActivationSize);
+ double memoryThresholdFraction =
conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION,
MEMORY_THRESHOLD_FRACTION_DEFAULT);
+ double collectionThresholdFraction =
conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION,
COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+ long unusedMemoryThreshold =
conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE,
UNUSED_MEMORY_THRESHOLD_DEFAULT);
+ configureMemoryThresholds(memoryThresholdFraction,
collectionThresholdFraction, unusedMemoryThreshold);
}
@Override
@@ -169,12 +199,11 @@ public class SpillableMemoryManager impl
// used - heapmax/2 + heapmax/4
long toFree = 0L;
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
- long threshold = (long)(info.getUsage().getMax() *
memoryThresholdFraction);
- toFree = info.getUsage().getUsed() - threshold + (long)(threshold
* 0.5);
+ toFree = info.getUsage().getUsed() - collectionThresholdSize +
(long)(collectionThresholdSize * 0.5);
//log
String msg = "memory handler call- Usage threshold "
- + info.getUsage();
+ + info.getUsage() + ", toFree = " + toFree;
if(!firstUsageThreshExceededLogged){
log.info("first " + msg);
firstUsageThreshExceededLogged = true;
@@ -182,12 +211,11 @@ public class SpillableMemoryManager impl
log.debug(msg);
}
} else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
- long threshold = (long)(info.getUsage().getMax() *
collectionMemoryThresholdFraction);
- toFree = info.getUsage().getUsed() - threshold + (long)(threshold
* 0.5);
+ toFree = info.getUsage().getUsed() - memoryThresholdSize +
(long)(memoryThresholdSize * 0.5);
//log
String msg = "memory handler call - Collection threshold "
- + info.getUsage();
+ + info.getUsage() + ", toFree = " + toFree;
if(!firstCollectionThreshExceededLogged){
log.info("first " + msg);
firstCollectionThreshExceededLogged = true;
Modified:
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
Fri Apr 1 20:19:53 2016
@@ -18,18 +18,13 @@
package org.apache.pig.newplan.logical.rules;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.CastExpression;
@@ -58,7 +53,7 @@ public abstract class TypeCastInserter e
public Transformer getNewTransformer() {
return new TypeCastInserterTransformer();
}
-
+
public class TypeCastInserterTransformer extends Transformer {
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
@@ -77,18 +72,15 @@ public abstract class TypeCastInserter e
}
// Now that we've narrowed it down to an operation that *can* have
casts added,
- // (because the user specified some types which might not match
the data) let's
+ // (because the user specified some types which might not match
the data) let's
// see if they're actually needed:
LogicalSchema determinedSchema = determineSchema(op);
- if(atLeastOneCastNeeded(determinedSchema, s)) {
- return true;
- }
-
if(determinedSchema == null || determinedSchema.size() !=
s.size()) {
// we don't know what the data looks like, but the user has
specified
- // that they want a certain number of fields loaded. We'll use
a
- // projection (or pruning) to make sure the columns show up
(with NULL
- // values) or are truncated from the right hand side of the
input data.
+ // that they want a certain number of fields loaded.
+ return true;
+ }
+ if(atLeastOneCastNeeded(determinedSchema, s)) {
return true;
}
@@ -98,7 +90,7 @@ public abstract class TypeCastInserter e
private boolean atLeastOneCastNeeded(LogicalSchema determinedSchema,
LogicalSchema s) {
for (int i = 0; i < s.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = s.getField(i);
- if (fs.type != DataType.BYTEARRAY && (determinedSchema == null
|| (!fs.isEqual(determinedSchema.getField(i))))) {
+ if (fs.type != DataType.BYTEARRAY &&
!fs.isEqual(determinedSchema.getField(i))) {
// we have to cast this field from the default BYTEARRAY
type to
// whatever the user specified in the 'AS' clause of the
LOAD
// statement (the fs.type).
@@ -120,64 +112,36 @@ public abstract class TypeCastInserter e
return;
}
- if(!atLeastOneCastNeeded(determinedSchema, s) && op instanceof
LOLoad) {
- // we're not going to insert any casts, but we might reduce or
increase
- // the number of columns coming out of the LOAD. If the loader
supports
- // it we'll use the 'requiredColumns' functionality rather
than bolting
- // on a FOREACH
- Set<Integer> required = new TreeSet<Integer>();
- for(int i = 0; i < s.size(); ++i) {
- // if we know the data source's schema, pick out the
columns we need,
- // otherwise take the first n
- int index = determinedSchema == null ? i :
determinedSchema.findField(s.getField(i).uid);
- if(index >= 0)
- required.add(index);
- }
-
- // pass the indices of the fields we need to a pruner, and
fire it off
- // so it configures the LOLoad (and the LoadFunc it contains)
- Map<LOLoad, Pair<Map<Integer, Set<String>>, Set<Integer>>>
requiredMap =
- new HashMap<LOLoad,
Pair<Map<Integer,Set<String>>,Set<Integer>>>(1);
- Pair<Map<Integer, Set<String>>, Set<Integer>> pair =
- new Pair<Map<Integer,Set<String>>, Set<Integer>>(null,
required);
- requiredMap.put((LOLoad) op, pair);
- new ColumnPruneVisitor(currentPlan, requiredMap ,
true).visit((LOLoad) op);
-
- // we only want to process this node once, so mark it:
- markCastNoNeed(op);
- return;
- }
-
// For every field, build a logical plan. If the field has a type
// other than byte array, then the plan will be cast(project).
Else
// it will just be project.
LogicalPlan innerPlan = new LogicalPlan();
-
+
LOForEach foreach = new LOForEach(currentPlan);
foreach.setInnerPlan(innerPlan);
foreach.setAlias(op.getAlias());
// Insert the foreach into the plan and patch up the plan.
Operator next = currentPlan.getSuccessors(op).get(0);
currentPlan.insertBetween(op, foreach, next);
-
+
List<LogicalExpressionPlan> exps = new
ArrayList<LogicalExpressionPlan>();
LOGenerate gen = new LOGenerate(innerPlan, exps, new
boolean[s.size()]);
innerPlan.add(gen);
for (int i = 0; i < s.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-
+
LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
- innerPlan.add(innerLoad);
+ innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, gen);
-
+
LogicalExpressionPlan exp = new LogicalExpressionPlan();
-
+
ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
exp.add(prj);
-
+
if (fs.type != DataType.BYTEARRAY && (determinedSchema == null
|| (!fs.isEqual(determinedSchema.getField(i))))) {
- // Either no schema was determined by loader OR the type
+ // Either no schema was determined by loader OR the type
// from the "determinedSchema" is different
// from the type specified - so we need to cast
CastExpression cast = new CastExpression(exp, prj, new
LogicalSchema.LogicalFieldSchema(fs));
@@ -187,7 +151,7 @@ public abstract class TypeCastInserter e
loadFuncSpec =
((LOLoad)op).getFileSpec().getFuncSpec();
} else if (op instanceof LOStream) {
StreamingCommand command =
((LOStream)op).getStreamingCommand();
- HandleSpec streamOutputSpec = command.getOutputSpec();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
loadFuncSpec = new
FuncSpec(streamOutputSpec.getSpec());
} else {
String msg = "TypeCastInserter invoked with an invalid
operator class name: " + innerPlan.getClass().getSimpleName();
@@ -199,7 +163,7 @@ public abstract class TypeCastInserter e
}
markCastInserted(op);
}
-
+
@Override
public OperatorPlan reportChanges() {
return currentPlan;
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
Fri Apr 1 20:19:53 2016
@@ -355,7 +355,20 @@ TOKEN_MGR_DECLS : {
<SCHEMA_DEFINITION> MORE :
{
<"("> {tupleSchemaLevel++;}
-| <")"> {tupleSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel
== 0)) SwitchTo(prevState); }
+| <")">
+ {
+ if ((tupleSchemaLevel == 0) && (bagSchemaLevel == 0)) {
+ // This means parenthesis is not from this schema_def.
+ // Putting back ")" although others do not check parenthesis at this
time.
+ // This is a bandaid workaround for the issue with inline-op which
+ // also uses parenthesis.
+ // Real fix would be to move out of using this javacc parser.
(PIG-2597)
+ input_stream.backup(1);
+ image.deleteCharAt(image.length()-1);
+ SwitchTo(prevState);
+ }
+ tupleSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel ==
0)) SwitchTo(prevState);
+ }
| <"{"> {bagSchemaLevel++;}
| <"}"> {bagSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel ==
0)) SwitchTo(prevState); }
| <("," | ";" )>
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
Fri Apr 1 20:19:53 2016
@@ -23,6 +23,7 @@ import static org.apache.pig.tools.pigst
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.pig.PigCounters;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -286,7 +288,14 @@ public class TezVertexStats extends JobS
multiStoreCounters.putAll(msGroup);
}
+ // Split followed by union will have multiple stores writing to same
location
+ Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
for (POStore sto : stores) {
+ POStoreTez store = (POStoreTez) sto;
+ uniqueOutputs.put(store.getOutputKey(), store);
+ }
+
+ for (POStore sto : uniqueOutputs.values()) {
if (sto.isTmpStore()) {
continue;
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Apr 1
20:19:53 2016
@@ -3209,7 +3209,9 @@ public class TestBuiltin {
String inputFileName = "testUniqueID.txt";
Util.createInputFile(cluster, inputFileName, new String[]
{"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
- PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+ Properties copyproperties = new Properties();
+ copyproperties.putAll(cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(),
copyproperties);
pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size",
"10");
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination",
"true");
pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
@@ -3255,6 +3257,7 @@ public class TestBuiltin {
assertEquals(iter.next().get(1), "0-2");
assertEquals(iter.next().get(1), "0-3");
}
+ Util.deleteFile(cluster, inputFileName);
}
@Test
@@ -3263,7 +3266,10 @@ public class TestBuiltin {
String inputFileName = "testRANDOM.txt";
Util.createInputFile(cluster, inputFileName, new String[]
{"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
- PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+
+ Properties copyproperties = new Properties();
+ copyproperties.putAll(cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(),
copyproperties);
// running with two mappers
pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size",
"10");
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination",
"true");
@@ -3290,6 +3296,7 @@ public class TestBuiltin {
for( int i = 0; i < 5; i++ ){
assertNotEquals(mapper1[i], mapper2[i], 0.0001);
}
+ Util.deleteFile(cluster, inputFileName);
}