Author: xuefu
Date: Fri Apr  1 20:19:53 2016
New Revision: 1737436

URL: http://svn.apache.org/viewvc?rev=1737436&view=rev
Log:
PIG-4855: Merge trunk[4] into spark branch (Pallavi via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/HKerberos.java
      - copied unchanged from r1737432, 
pig/trunk/src/org/apache/pig/backend/hadoop/HKerberos.java
    
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
      - copied unchanged from r1737432, 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
Modified:
    pig/branches/spark/   (props changed)
    pig/branches/spark/CHANGES.txt
    pig/branches/spark/conf/pig.properties
    
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
    pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
    pig/branches/spark/src/org/apache/pig/PigConfiguration.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
    pig/branches/spark/src/org/apache/pig/impl/PigContext.java
    pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
    
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
    pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
    
pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java
    pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java
    
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java
    pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
    
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
    pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushUpFilter.java
    pig/branches/spark/test/org/apache/pig/test/TestPigStorage.java
    pig/branches/spark/test/org/apache/pig/test/Util.java
    pig/branches/spark/test/org/apache/pig/test/data/DotFiles/explain1.dot
    pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java

Propchange: pig/branches/spark/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr  1 20:19:53 2016
@@ -1,2 +1,2 @@
 /hadoop/pig/branches/multiquery:741727-770826
-/pig/trunk:1621676-1733612
+/pig/trunk:1621676-1737432

Modified: pig/branches/spark/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/CHANGES.txt?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/CHANGES.txt (original)
+++ pig/branches/spark/CHANGES.txt Fri Apr  1 20:19:53 2016
@@ -24,6 +24,14 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4847: POPartialAgg processing and spill improvements (rohini)
+
+PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of 
no vertex groups (rohini)
+
+PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine 
plan (rohini)
+
+PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via 
daijy)
+
 PIG-4817: Bump HTTP Logparser to version 2.4 (nielsbasjes via daijy)
 
 PIG-4811: Upgrade groovy library to address MethodClosure vulnerability        
(daijy)
@@ -97,6 +105,18 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4851: Null not padded when input has less fields than declared schema for 
some loader (rohini)
+
+PIG-4850: Registered jars do not use submit replication (rdblue via cheolsoo)
+
+PIG-4845: Parallel instantiation of classes in Tez cause tasks to fail (rohini)
+
+PIG-4841: Inline-op with schema declaration fails with syntax error (knoguchi)
+
+PIG-4832: Fix TestPrumeColumn NPE failure (kellyzly via daijy)
+
+PIG-4833 TestBuiltin.testURIWithCurlyBrace in TEZ failing after PIG-4819 
(knoguchi)
+
 PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)
 
 PIG-4816: Read a null scalar causing a Tez failure (daijy)

Modified: pig/branches/spark/conf/pig.properties
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/conf/pig.properties?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/conf/pig.properties (original)
+++ pig/branches/spark/conf/pig.properties Fri Apr  1 20:19:53 2016
@@ -193,6 +193,18 @@
 #
 # pig.spill.gc.activation.size=40000000
 
+# Spill will be triggered if the fraction of Old Generation heap exceeds the 
usage or collection threshold. 
+# For bigger heap sizes, using a fixed size for collection and usage 
thresholds will
+# utilize memory better than a percentage of the heap.
+# So usage threshold is calculated as 
+#     Max(HeapSize * pig.spill.memory.usage.threshold.fraction, HeapSize - 
pig.spill.unused.memory.threshold.size)
+# So collection threshold is calculated as 
+#     Max(HeapSize * pig.spill.collection.threshold.fraction, HeapSize - 
pig.spill.unused.memory.threshold.size)
+
+# pig.spill.memory.usage.threshold.fraction=0.7
+# pig.spill.collection.threshold.fraction=0.7 
+# pig.spill.unused.memory.threshold.size=367001600
+
 # Maximum amount of data to replicate using the distributed cache when doing
 # fragment-replicated join. (default: 1000000000, about 1GB) Consider 
increasing
 # this in a production environment, but carefully.
@@ -331,6 +343,15 @@
 #
 # pig.exec.nocombiner=false
 
+# Enable or disable use of combiners only in reducer shuffle-merge phase. 
+# pig.exec.nocombiner turns off combiner for both map and reduce phases. 
+# Valid values are auto, true or false. Default is auto in which Pig turns off 
combiner
+# on per combine plan basis when bags are present in a particular plan.
+# Value of true or false will apply to all combine plans in the script.
+# Currently this only applies to Tez as Mapreduce does not run combiners in 
reducer (MAPREDUCE-5221).
+
+# pig.exec.nocombiner.reducer=auto
+
 # EXPERIMENTAL: Aggregate records in map task before sending to the combiner?
 # (default: false, 10; recommended: true, 10). In cases where there is a 
massive
 # reduction of data in the aggregation step, pig can do a first pass of

Modified: 
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
 (original)
+++ 
pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
 Fri Apr  1 20:19:53 2016
@@ -29,17 +29,19 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.MiniCluster;
 import org.apache.pig.test.Util;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestCSVStorage {
     protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
-    
+
     private PigServer pigServer;
     private MiniCluster cluster;
-    
+
     public TestCSVStorage() throws ExecException, IOException {
         cluster = MiniCluster.buildCluster();
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
@@ -59,8 +61,8 @@ public class TestCSVStorage {
         Iterator<Tuple> it = pigServer.openIterator("a");
         assertEquals(Util.createTuple(new String[] {"foo", "bar", "baz"}), 
it.next());
     }
-   
-    @Test 
+
+    @Test
     public void testQuotedCommas() throws IOException {
         String inputFileName = "TestCSVLoader-quotedcommas.txt";
         Util.createLocalInputFile(inputFileName, new String[] 
{"\"foo,bar,baz\"", "fee,foe,fum"});
@@ -71,11 +73,11 @@ public class TestCSVStorage {
         assertEquals(Util.createTuple(new String[] {"foo,bar,baz", null, 
null}), it.next());
         assertEquals(Util.createTuple(new String[] {"fee", "foe", "fum"}), 
it.next());
     }
-    
+
     @Test
     public void testQuotedQuotes() throws IOException {
         String inputFileName = "TestCSVLoader-quotedquotes.txt";
-        Util.createLocalInputFile(inputFileName, 
+        Util.createLocalInputFile(inputFileName,
                 new String[] {"\"foo,\"\"bar\"\",baz\"", "\"\"\"\"\"\"\"\""});
         String script = "a = load '" + inputFileName + "' using 
org.apache.pig.piggybank.storage.CSVLoader() " +
         "   as (a:chararray); ";
@@ -84,5 +86,21 @@ public class TestCSVStorage {
         assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}), 
it.next());
         assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
     }
-    
+
+    @Test
+    public void testNullPadding() throws IOException {
+        String inputFileName = "TestCSVLoader-nullpadding.txt";
+        Util.createLocalInputFile(inputFileName, new String[] { "a", "b,", 
"c,d", ",e"});
+        String script = "a = load '" + inputFileName + "' using 
org.apache.pig.piggybank.storage.CSVLoader() " +
+        "   as (field1, field2); dump a;";
+        Util.registerMultiLineQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("a");
+        assertEquals(Util.createTuple(new DataByteArray[] {new 
DataByteArray("a"), null}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new 
DataByteArray("b"), null}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new 
DataByteArray("c"), new DataByteArray("d")}), it.next());
+        assertEquals(Util.createTuple(new DataByteArray[] {new 
DataByteArray(""), new DataByteArray("e")}), it.next());
+        Assert.assertFalse(it.hasNext());
+    }
+
+
 }

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml 
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri 
Apr  1 20:19:53 2016
@@ -325,6 +325,60 @@ $ pig hdfs://nn.mydomain.com:9020/myscri
 </section>
 
   <!-- ==================================================================== -->
+  <section id="kerberos">
+     <title>Running jobs on a Kerberos secured cluster</title>
+     <p>Kerberos is a authentication system that uses tickets with a limited 
validity time.<br/>
+        As a consequence running a pig script on a kerberos secured hadoop 
cluster limits the running time to at most
+        the remaining validity time of these kerberos tickets. When doing 
really complex analytics this may become a
+        problem as the job may need to run for a longer time than these ticket 
times allow.</p>
+
+     <section id="kerberos-short">
+     <title>Short lived jobs</title>
+     <p>When running short jobs all you need to do is ensure that the user has 
been logged in into Kerberos via the
+        normal kinit method.<br/>
+        The Hadoop job will automatically pickup these credentials and the job 
will run fine.</p>
+     </section>
+
+     <section id="kerberos-long">
+     <title>Long lived jobs</title>
+     <p>A kerberos keytab file is essentially a Kerberos specific form of the 
password of a user. <br/>
+        It is possible to enable a Hadoop job to request new tickets when they 
expire by creating a keytab file and
+        make it part of the job that is running in the cluster.
+        This will extend the maximum job duration beyond the maximum renew 
time of the kerberos tickets.</p>
+     <p>Usage:</p>
+     <ol>
+        <li>Create a keytab file for the required principal.<br/>
+           Using the ktutil tool you can create a keytab using roughly these 
commands:<br/>
+           <source>addent -password -p [email protected] -k 1 -e rc4-hmac
+addent -password -p [email protected] -k 1 -e aes256-cts
+wkt niels.keytab</source>
+        </li>
+        <li>Set the following properties (either via the .pigrc file or on the 
command line via -P file)
+            <ul>
+            <li><em>java.security.krb5.conf</em><br/>
+                The path to the local krb5.conf file.<br/>
+                Usually this is "/etc/krb5.conf"</li>
+            <li><em>hadoop.security.krb5.principal</em><br/>
+                The pricipal you want to login with.<br/>
+                Usually this would look like this "[email protected]"</li>
+            <li><em>hadoop.security.krb5.keytab</em><br/>
+                The path to the local keytab file that must be used to 
authenticate with.<br/>
+                Usually this would look like this 
"/home/niels/.krb/niels.keytab"</li>
+            </ul></li>
+     </ol>
+     <p><strong>NOTE:</strong>All paths in these variables are local to the 
client system starting the actual pig script.
+        This can be run without any special access to the cluster nodes.</p>
+
+         <p>Overall you would create a file that looks like this (assume we 
call it niels.kerberos.properties):</p>
+         <source>java.security.krb5.conf=/etc/krb5.conf
[email protected]
+hadoop.security.krb5.keytab=/home/niels/.krb/niels.keytab</source>
+         <p>and start your script like this:</p>
+         <source>pig -P niels.kerberos.properties script.pig</source>
+     </section>
+  </section>
+
+  <!-- ==================================================================== -->
     
    <!-- PIG LATIN STATEMENTS -->
    <section id="pl-statements">

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Apr  1 
20:19:53 2016
@@ -94,6 +94,13 @@ public class PigConfiguration {
     public static final String PIG_EXEC_NO_COMBINER = "pig.exec.nocombiner";
 
     /**
+     * Enable or disable use of combiners in reducer shuffle-merge phase in 
Tez.
+     * Valid values are auto, true or false.
+     * Default is auto which turns off combiner if bags are present in the 
combine plan
+     */
+    public static final String PIG_EXEC_NO_COMBINER_REDUCER = 
"pig.exec.nocombiner.reducer";
+
+    /**
      * This key controls whether secondary sort key is used for optimization 
in case
      * of nested distinct or sort
      */
@@ -328,7 +335,7 @@ public class PigConfiguration {
      * Set the threshold for percentage of errors
      */
     public static final String PIG_ERROR_THRESHOLD_PERCENT = 
"pig.error.threshold.percent";
-    
+
     /**
      * Comma-delimited entries of commands/operators that must be disallowed.
      * This is a security feature to be used by administrators to block use of
@@ -380,6 +387,29 @@ public class PigConfiguration {
     public static final String PIG_TEZ_DAG_STATUS_REPORT_INTERVAL = 
"pig.tez.dag.status.report.interval";
 
 
+    // SpillableMemoryManager settings
+
+    /**
+     * Spill will be triggered if the fraction of biggest heap exceeds the 
usage threshold.
+     * If {@link PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE} is 
non-zero, then usage threshold is calculated as
+     * Max(HeapSize * PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, HeapSize - 
PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE)
+     * Default is 0.7
+     */
+    public static final String PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION = 
"pig.spill.memory.usage.threshold.fraction";
+
+    /**
+     * Spill will be triggered if the fraction of biggest heap exceeds the 
collection threshold.
+     * If {@link PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE} is 
non-zero, then collection threshold is calculated as
+     * Max(HeapSize * PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, HeapSize - 
PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE)
+     * Default is 0.7
+     */
+    public static final String PIG_SPILL_COLLECTION_THRESHOLD_FRACTION = 
"pig.spill.collection.threshold.fraction";
+
+    /**
+     * Spill will be triggered when unused memory falls below the threshold.
+     * Default is 350MB
+     */
+    public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = 
"pig.spill.unused.memory.threshold.size";
 
     // Deprecated settings of Pig 0.13
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Fri Apr  1 20:19:53 2016
@@ -37,6 +37,7 @@ import org.apache.pig.backend.BackendExc
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.HKerberos;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
@@ -215,6 +216,9 @@ public abstract class HExecutionEngine i
         // the properties
         Utils.recomputeProperties(jc, properties);
 
+        // Ensure we have been logged in using the kerberos keytab (if 
provided) before continuing.
+        HKerberos.tryKerberosKeytabLogin(jc);
+
         cluster = jc.get(MRConfiguration.JOB_TRACKER);
         nameNode = jc.get(FILE_SYSTEM_LOCATION);
         if (nameNode == null) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Apr  1 20:19:53 2016
@@ -1783,6 +1783,7 @@ public class JobControlCompiler{
             PigContext pigContext,
             Configuration conf,
             URL url) throws IOException {
+        short replication = (short) 
conf.getInt(MRConfiguration.SUMIT_REPLICATION, 3);
 
         boolean cacheEnabled =
                 conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, 
false);
@@ -1809,6 +1810,7 @@ public class JobControlCompiler{
                 os.close();
             }
         }
+        fs.setReplication(dst, replication);
         return dst;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 Fri Apr  1 20:19:53 2016
@@ -33,7 +33,6 @@ import org.apache.log4j.PropertyConfigur
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -161,7 +160,7 @@ public abstract class PigGenericMapBase
         super.setup(context);
 
         Configuration job = context.getConfiguration();
-        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+        SpillableMemoryManager.getInstance().configure(job);
         context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConfInternal.set(context.getConfiguration());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Fri Apr  1 20:19:53 2016
@@ -34,7 +34,6 @@ import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
             if (inIllustrator)
                 pack = getPack(context);
             Configuration jConf = context.getConfiguration();
-            
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+            SpillableMemoryManager.getInstance().configure(jConf);
             context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
             sJobContext = context;
             sJobConfInternal.set(context.getConfiguration());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Fri Apr  1 20:19:53 2016
@@ -491,6 +491,11 @@ public class POUserFunc extends Expressi
         }
     }
 
+    public Type getOriginalReturnType() throws ExecException {
+        instantiateFunc(origFSpec);
+        return func.getReturnType();
+    }
+
     public Type getReturnType() {
         return func.getReturnType();
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 Fri Apr  1 20:19:53 2016
@@ -141,7 +141,7 @@ public class POPartialAgg extends Physic
     }
 
     private void init() throws ExecException {
-        ALL_POPARTS.put(this, null);
+
         numRecsInRawMap = 0;
         numRecsInProcessedMap = 0;
         minOutputReduction = DEFAULT_MIN_REDUCTION;
@@ -172,6 +172,9 @@ public class POPartialAgg extends Physic
             // Set them to true instead of adding another check for 
!disableMapAgg
             sizeReductionChecked = true;
             estimatedMemThresholds = true;
+        } else {
+            ALL_POPARTS.put(this, null);
+            SpillableMemoryManager.getInstance().registerSpillable(this);
         }
         // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
         // newHashMapWithExpectedSize does new HashMap(expectedSize + 
expectedSize/3)
@@ -187,7 +190,6 @@ public class POPartialAgg extends Physic
             listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
         }
         initialized = true;
-        SpillableMemoryManager.getInstance().registerSpillable(this);
     }
 
     @Override
@@ -203,7 +205,7 @@ public class POPartialAgg extends Physic
         // The fact that we are in the latter stage is communicated via the 
doSpill
         // flag.
 
-        if (!initialized && !ALL_POPARTS.containsKey(this)) {
+        if (!initialized) {
             init();
         }
 
@@ -224,8 +226,12 @@ public class POPartialAgg extends Physic
                 if (doSpill == false) {
                     // SpillableMemoryManager requested a spill to reduce 
memory
                     // consumption. See if we can avoid it.
+                    // If we see that there are way more records in processed 
map than
+                    // raw map, it is better to spill after the aggregation
+                    boolean spillProcessedMap = (3 * numRecsInRawMap) < 
numRecsInProcessedMap;
                     aggregateBothLevels(false, false);
-                    if (shouldSpill()) {
+                    // Spill if the numRecsInProcessedMap
+                    if (spillProcessedMap || shouldSpill()) {
                         startSpill(false);
                     } else {
                         LOG.info("Avoided emitting records during spill memory 
call.");
@@ -360,6 +366,7 @@ public class POPartialAgg extends Physic
         // called and size reduction checked
         startSpill(false);
         disableMapAgg = true;
+        ALL_POPARTS.remove(this);
     }
 
     private boolean mapAggDisabled() {
@@ -379,19 +386,21 @@ public class POPartialAgg extends Physic
         return shouldAggregateSecondLevel();
     }
 
+    private void addKeySingleValToMap(Map<Object, List<Tuple>> map,
+            Object key, List<Tuple> inpTuple) throws ExecException {
+        List<Tuple> value = map.get(key);
+        if (value == null) {
+            map.put(key, inpTuple);
+        } else {
+            value.add(inpTuple.get(0));
+        }
+    }
+
     private void addKeyValToMap(Map<Object, List<Tuple>> map,
             Object key, Tuple inpTuple) throws ExecException {
         List<Tuple> value = map.get(key);
         if (value == null) {
-            if (isGroupAll) {
-                // Set exact array initial size to avoid array copies
-                // listSizeThreshold = numRecordsToSample before estimating 
memory
-                // thresholds and firstTierThreshold after memory estimation
-                int listSize = (map == rawInputMap) ? listSizeThreshold : 
Math.min(secondTierThreshold, MAX_LIST_SIZE);
-                value = new ArrayList<Tuple>(listSize);
-            } else {
-                value = new ArrayList<Tuple>();
-            }
+            value = createNewValueList(map);
             map.put(key, value);
         }
         value.add(inpTuple);
@@ -409,6 +418,20 @@ public class POPartialAgg extends Physic
         }
     }
 
+    private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
+        List<Tuple> value;
+        if (isGroupAll) {
+            // Set exact array initial size to avoid array copies
+            // listSizeThreshold = numRecordsToSample before estimating memory
+            // thresholds and firstTierThreshold after memory estimation
+            int listSize = (map == rawInputMap) ? listSizeThreshold : 
Math.min(secondTierThreshold + 1, MAX_LIST_SIZE);
+            value = new ArrayList<Tuple>(listSize);
+        } else {
+            value = new ArrayList<Tuple>();
+        }
+        return value;
+    }
+
     private void startSpill(boolean aggregate) throws ExecException {
         // If spillingIterator is null, we are already spilling and don't need 
to set up.
         if (spillingIterator != null) return;
@@ -452,13 +475,38 @@ public class POPartialAgg extends Physic
      * @throws ExecException
      */
     private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, 
List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+        boolean srcDestDifferent = (fromMap == toMap) ? false : true;
         Iterator<Map.Entry<Object, List<Tuple>>> iter = 
fromMap.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<Object, List<Tuple>> entry = iter.next();
-            Tuple valueTuple = createValueTuple(entry.getKey(), 
entry.getValue());
-            Result res = getOutput(entry.getKey(), valueTuple);
-            iter.remove();
-            addKeyValToMap(toMap, entry.getKey(), 
getAggResultTuple(res.result));
+            if (entry.getValue().size() == 1) {
+                // If fromMap and toMap are same (processedInputMap), then 
continue without any change
+                // If different (rawInputMap->processedInputMap), add directly 
to target and skip running through the valuePlans
+                if (srcDestDifferent) {
+                    iter.remove();
+                    addKeySingleValToMap(toMap, entry.getKey(), 
entry.getValue());
+                }
+            } else {
+                Tuple valueTuple = createValueTuple(entry.getKey(), 
entry.getValue());
+                Result res = getOutput(entry.getKey(), valueTuple);
+                if (srcDestDifferent) {
+                    // Remove from src and add to destination 
(rawInputMap->processedInputMap)
+                    iter.remove();
+                    addKeyValToMap(toMap, entry.getKey(), 
getAggResultTuple(res.result));
+                } else {
+                    // Update processedInputMap in place
+                    List<Tuple> value = entry.getValue();
+                    if (isGroupAll) {
+                        value.clear();
+                    } else {
+                        // Creating a new list to free more space instead of
+                        // calling clear as the same key might not repeat again
+                        value = createNewValueList(toMap);
+                        toMap.put(entry.getKey(), value);
+                    }
+                    value.add(getAggResultTuple(res.result));
+                }
+            }
             numEntriesInTarget++;
         }
         return numEntriesInTarget;
@@ -495,9 +543,7 @@ public class POPartialAgg extends Physic
             return;
         }
         int processedTuples = numRecsInProcessedMap;
-        Map<Object, List<Tuple>> newMap = 
Maps.newHashMapWithExpectedSize(processedInputMap.size());
-        numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
-        processedInputMap = newMap;
+        numRecsInProcessedMap = aggregate(processedInputMap, 
processedInputMap, 0);
         LOG.info("Aggregated " + processedTuples + " processed tuples to " + 
numRecsInProcessedMap + " tuples");
     }
 
@@ -643,14 +689,13 @@ public class POPartialAgg extends Physic
                 LOG.info("Spill triggered by SpillableMemoryManager, but 
previous spill call is still not processed. Skipping");
                 return 0;
             }
-            LOG.info("Spill triggered by SpillableMemoryManager");
             synchronized(spillLock) {
                 if (rawInputMap != null) {
-                    LOG.info("Memory usage: " + getMemorySize()
-                            + ". Raw map: num keys = " + rawInputMap.size()
-                            + ", num tuples = "+ numRecsInRawMap
-                            + ", Processed map: num keys = " + 
processedInputMap.size()
-                            + ", num tuples = "+ numRecsInProcessedMap );
+                    LOG.info("Spill triggered. Memory usage: " + 
getMemorySize()
+                            + ". Raw map: keys = " + rawInputMap.size()
+                            + ", tuples = "+ numRecsInRawMap
+                            + ", Processed map: keys = " + 
processedInputMap.size()
+                            + ", tuples = "+ numRecsInProcessedMap );
                 }
                 startedContingentSpill = false;
                 doContingentSpill = true;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Fri Apr  1 20:19:53 2016
@@ -103,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -507,8 +508,28 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+        out.setUserPayload(payLoad);
+
+        if (!combinePlan.isEmpty()) {
+            boolean noCombineInReducer = false;
+            String reducerNoCombiner = 
globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
+            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) 
{
+                noCombineInReducer = 
TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
+            } else {
+                noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
+            }
+            if (noCombineInReducer) {
+                log.info("Turning off combiner in reducer vertex " + 
to.getOperatorKey() + " for edge from " + from.getOperatorKey());
+                conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+                conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
+                conf.unset("pig.combinePlan");
+                conf.unset("pig.combine.package");
+                conf.unset("pig.map.keytype");
+                payLoad = TezUtils.createUserPayloadFromConf(conf);
+            }
+        }
+        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && 
(to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 Fri Apr  1 20:19:53 2016
@@ -55,6 +55,14 @@ public class TezResourceManager {
         return instance;
     }
 
+
+    /**
+     * This method is only used by test code to reset state.
+     */
+    public static void dropInstance() {
+        instance = null;
+    }
+
     public void init(PigContext pigContext, Configuration conf) throws 
IOException {
         if (!inited) {
             this.resourcesDir = 
FileLocalizer.getTemporaryResourcePath(pigContext);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Fri Apr  1 20:19:53 2016
@@ -18,8 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -68,10 +70,23 @@ public class MultiQueryOptimizerTez exte
                 return;
             }
 
+            // Using a list instead of set to have consistently ordered plans
             List<TezOperator> splittees = new ArrayList<TezOperator>();
             Set<TezOperator> mergedNonPackageInputSuccessors = new 
HashSet<TezOperator>();
 
+            // When there is a union successor with unsupported storefunc, 
those splittees
+            // can only be merged into the split if all the union members will 
be from the split
+            // This is to ensure that there are no vertex groups created with 
unsupported storefunc.
+            Map<TezOperator, Set<OperatorKey>> tentativeMergeUnionMembers = 
new HashMap<TezOperator, Set<OperatorKey>>();
+
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+
+            Set<OperatorKey> splitterAndSuccessorKeys = new 
HashSet<OperatorKey>();
+            splitterAndSuccessorKeys.add(tezOp.getOperatorKey());
+            for (TezOperator successor : successors) {
+                splitterAndSuccessorKeys.add(successor.getOperatorKey());
+            }
+
             for (TezOperator successor : successors) {
                 List<TezOperator> predecessors = new 
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
                 predecessors.remove(tezOp);
@@ -129,6 +144,7 @@ public class MultiQueryOptimizerTez exte
                 // Only in case of POPackage(POShuffleTezLoad) multiple inputs 
can be handled from a Split
                 Set<TezOperator> nonPackageInputSuccessors = new 
HashSet<TezOperator>();
                 boolean canMerge = true;
+                Set<TezOperator> successorUnsupportedStoreUnions = new 
HashSet<TezOperator>();
 
                 mergedSuccessors.addAll(successors);
                 for (TezOperator splittee : splittees) {
@@ -140,11 +156,21 @@ public class MultiQueryOptimizerTez exte
                     for (TezOperator succSuccessor : 
getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn &&
-                                    UnionOptimizer.isOptimizable(succSuccessor,
-                                            unionSupportedStoreFuncs,
-                                            unionUnsupportedStoreFuncs))) {
+                                    
UnionOptimizer.isOptimizable(succSuccessor))) {
                                 toNotMergeSuccessors.add(succSuccessor);
                             } else {
+                                if (unionOptimizerOn && 
!UnionOptimizer.isOptimizableStoreFunc(succSuccessor,unionSupportedStoreFuncs,unionUnsupportedStoreFuncs))
 {
+                                    // This optimization of using 
UnionOptimizer for unsupported storefunc
+                                    // is only good for one level of split and 
does not handle multiple level of split.
+                                    Set<OperatorKey> unionMembers = new 
HashSet<OperatorKey>(succSuccessor.getUnionMembers());
+                                    
unionMembers.removeAll(splitterAndSuccessorKeys);
+                                    if(unionMembers.isEmpty()) {
+                                        
successorUnsupportedStoreUnions.add(succSuccessor);
+                                    } else {
+                                        
toNotMergeSuccessors.add(succSuccessor);
+                                        continue;
+                                    }
+                                }
                                 toMergeSuccessors.add(succSuccessor);
                                 List<TezOperator> unionSuccessors = 
getPlan().getSuccessors(succSuccessor);
                                 if (unionSuccessors != null) {
@@ -187,11 +213,47 @@ public class MultiQueryOptimizerTez exte
 
                 mergedSuccessors.retainAll(toNotMergeSuccessors);
                 if (mergedSuccessors.isEmpty()) { // no shared edge after merge
-                    splittees.add(successor);
                     
mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
+                    if (successorUnsupportedStoreUnions.isEmpty()) {
+                        splittees.add(successor);
+                    } else {
+                        // If all other conditions were satisfied, but it had 
a successor union
+                        // with unsupported storefunc keep it in the tentative 
list
+                        for (TezOperator unionOp : 
successorUnsupportedStoreUnions) {
+                            Set<OperatorKey> tentativeSuccessors = 
tentativeMergeUnionMembers.get(unionOp);
+                            if (tentativeSuccessors == null) {
+                                tentativeSuccessors = new 
HashSet<OperatorKey>();
+                                tentativeMergeUnionMembers.put(unionOp, 
tentativeSuccessors);
+                            }
+                            
tentativeSuccessors.add(successor.getOperatorKey());
+                        }
+                    }
                 }
             }
 
+            Set<TezOperator> spliteesToRemove = new HashSet<TezOperator>();
+
+            for (Entry<TezOperator, Set<OperatorKey>> entry : 
tentativeMergeUnionMembers.entrySet()) {
+                Set<OperatorKey> unionMembers = new 
HashSet<OperatorKey>(entry.getKey().getUnionMembers());
+                if (entry.getValue().containsAll(unionMembers)) {
+                    // If all the union members were tentative splittees then 
add them
+                    for (OperatorKey key : entry.getValue()) {
+                        TezOperator splittee = getPlan().getOperator(key);
+                        if (!splittees.contains(splittee)) {
+                            splittees.add(splittee);
+                        }
+                    }
+                } else {
+                    for (OperatorKey key : entry.getValue()) {
+                        spliteesToRemove.add(getPlan().getOperator(key));
+                    }
+                }
+            }
+
+            for (TezOperator op : spliteesToRemove) {
+                splittees.remove(op);
+            }
+
             if (splittees.size() == 0) {
                 return;
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Fri Apr  1 20:19:53 2016
@@ -104,9 +104,7 @@ public class UnionOptimizer extends TezO
         this.unsupportedStoreFuncs = unsupportedStoreFuncs;
     }
 
-    public static boolean isOptimizable(TezOperator tezOp,
-            List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
-            throws VisitorException {
+    public static boolean isOptimizable(TezOperator tezOp) throws 
VisitorException {
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
             return false;
         }
@@ -116,6 +114,12 @@ public class UnionOptimizer extends TezO
         if (tezOp.isRankCounter()) {
             return false;
         }
+        return true;
+    }
+
+    public static boolean isOptimizableStoreFunc(TezOperator tezOp,
+            List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
+            throws VisitorException {
         if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
             List<POStoreTez> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
             for (POStoreTez store : stores) {
@@ -148,7 +152,7 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) 
{
+        if (!isOptimizable(tezOp)) {
             return;
         }
 
@@ -156,34 +160,40 @@ public class UnionOptimizer extends TezO
         String scope = unionOp.getOperatorKey().scope;
         PhysicalPlan unionOpPlan = unionOp.plan;
 
-        // TODO: PIG-3856 Handle replicated join and skewed join sample.
-        // Replicate join small table/skewed join sample that was broadcast to 
union vertex
-        // now needs to be broadcast to all the union predecessors. How do we 
do that??
-        // Wait for shared edge and do it or write multiple times??
-        // For now don't optimize except in the case of Split where we need to 
write only once
-
         Set<OperatorKey> uniqueUnionMembers = new 
HashSet<OperatorKey>(unionOp.getUnionMembers());
         List<TezOperator> predecessors = new 
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
         List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null 
? null
                 : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
 
+        if (uniqueUnionMembers.size() != 1) {
+
+            if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs, 
unsupportedStoreFuncs)) {
+                return;
+            }
 
-        if (successors != null && uniqueUnionMembers.size() > 1) {
-            for (TezOperator succ : successors) {
-                for (TezOperator pred : predecessors) {
-                    if (succ.inEdges.containsKey(pred.getOperatorKey())) {
-                        // Stop here, we cannot convert the node into vertex 
group
-                        // Otherwise, we will end up with a parallel edge 
between pred
-                        // and succ
-                        return;
+            if (successors != null) {
+                for (TezOperator succ : successors) {
+                    for (TezOperator pred : predecessors) {
+                        if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+                            // Stop here, we cannot convert the node into 
vertex group
+                            // Otherwise, we will end up with a parallel edge 
between pred
+                            // and succ
+                            return;
+                        }
                     }
                 }
             }
+
+            // TODO: PIG-3856 Handle replicated join and skewed join sample.
+            // Replicate join small table/skewed join sample that was 
broadcast to union vertex
+            // now needs to be broadcast to all the union predecessors. How do 
we do that??
+            // Wait for shared edge and do it or write multiple times??
+            // For now don't optimize except in the case of Split where we 
need to write only once
+            if (predecessors.size() > unionOp.getUnionMembers().size()) {
+                return;
+            }
         }
-        if (predecessors.size() > unionOp.getUnionMembers().size()
-                && uniqueUnionMembers.size() != 1) {
-            return; // TODO: PIG-3856
-        }
+
         if (uniqueUnionMembers.size() == 1) {
             // We actually don't need VertexGroup in this case. The multiple
             // sub-plans of Split can write to same MROutput or the Tez 
LogicalOutput

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Fri Apr  1 20:19:53 2016
@@ -56,6 +56,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -128,6 +129,7 @@ public class PigProcessor extends Abstra
 
         UserPayload payload = getContext().getUserPayload();
         conf = TezUtils.createConfFromUserPayload(payload);
+        SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
         PigContext pc = (PigContext) 
ObjectSerializer.deserialize(conf.get("pig.pigContext"));

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Fri Apr  1 20:19:53 2016
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -273,4 +275,28 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
+    public static boolean bagDataTypeInCombinePlan(PhysicalPlan combinePlan) 
throws ExecException {
+        PhysicalOperator lr = combinePlan.getLeaves().get(0);
+        POForEach fe = (POForEach) combinePlan.getPredecessors(lr).get(0);
+
+        // Hack. class.getTypeName() is only available in JDK8
+        Type dataBagType = new TOBAG().getReturnType();
+
+        List<PhysicalPlan> inputPlans = fe.getInputPlans();
+        for (PhysicalPlan inputPlan: inputPlans) {
+            PhysicalOperator leaf = inputPlan.getLeaves().get(0);
+            if (leaf.getResultType() == DataType.BAG) {
+                return true;
+            } else if (leaf instanceof POUserFunc) {
+                POUserFunc func = (POUserFunc) leaf;
+                // Return type of Intermediate func in combiner plan is always 
Tuple.
+                // Need to check original or Final EvalFunc return type
+                if (dataBagType.equals(func.getOriginalReturnType())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Apr  1 
20:19:53 2016
@@ -73,7 +73,6 @@ import org.apache.pig.bzip2r.Bzip2TextIn
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.CastUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -178,7 +177,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
         overwrite.setArgs(1);
         overwrite.setArgName("overwrite");
         validOptions.addOption(overwrite);
-        
+
         return validOptions;
     }
 
@@ -225,7 +224,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
                 if ("true".equalsIgnoreCase(value)) {
                     overwriteOutput = true;
                 }
-            }       
+            }
             dontLoadSchema = configuredOptions.hasOption("noschema");
             tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
             tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -302,31 +301,18 @@ LoadPushDown, LoadMetadata, StoreMetadat
             Properties p = 
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                     new String[] {signature});
             String serializedSchema = p.getProperty(signature+".schema");
-            if (serializedSchema != null) {
-                try {
-                    schema = new 
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
-                } catch (ParserException e) {
-                    mLog.error("Unable to parse serialized schema " + 
serializedSchema, e);
-                    // all bets are off - there's no guarantee that we'll 
return
-                    // either the fields in the data or the fields in the 
schema
-                    // the user specified (or required)
-                }
+            if (serializedSchema == null) return tup;
+            try {
+                schema = new 
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
+            } catch (ParserException e) {
+                mLog.error("Unable to parse serialized schema " + 
serializedSchema, e);
+                // all bets are off - there's no guarantee that we'll return
+                // either the fields in the data or the fields in the schema
+                // the user specified (or required)
             }
         }
 
-        if (schema == null) {
-            // if the number of required fields are less than or equal to 
-            // the number of fields in the data then we're OK as we've already
-            // read only the required number of fields into the tuple. If 
-            // more fields are required than are in the data then we'll pad
-            // with nulls:
-            int numRequiredColumns = 0;
-            for (int i = 0; mRequiredColumns != null && i < 
mRequiredColumns.length; i++)
-                if(mRequiredColumns[i])
-                    ++numRequiredColumns;
-            for (int i = tup.size();i < numRequiredColumns; ++i)
-                tup.append(null);
-        } else {
+        if (schema != null) {
             ResourceFieldSchema[] fieldSchemas = schema.getFields();
             int tupleIdx = 0;
             // If some fields have been projected out, the tuple
@@ -338,7 +324,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
                     if (tupleIdx >= tup.size()) {
                         tup.append(null);
                     }
-                    
+
                     Object val = null;
                     if(tup.get(tupleIdx) != null){
                         byte[] bytes = ((DataByteArray) 
tup.get(tupleIdx)).get();

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri Apr  1 
20:19:53 2016
@@ -77,13 +77,13 @@ public class PigContext implements Seria
     private static final long serialVersionUID = 1L;
 
     private static final Log log = LogFactory.getLog(PigContext.class);
+    private static Object instantiationLock = new Object();
 
     public static final String JOB_NAME = "jobName";
     public static final String JOB_NAME_PREFIX= "PigLatin";
     public static final String JOB_PRIORITY = "jobPriority";
     public static final String PIG_CMD_ARGS_REMAINDERS = 
"pig.cmd.args.remainders";
 
-
     /* NOTE: we only serialize some of the stuff
      *
      *(to make it smaller given that it's not all needed on the Hadoop side,
@@ -729,26 +729,41 @@ public class PigContext implements Seria
             throw new RuntimeException("Cannot instantiate: " + className, 
ioe) ;
         }
 
-        try {
-            // Do normal instantiation
-            if (args != null && args.length > 0) {
-                Class paramTypes[] = new Class[args.length];
-                for (int i = 0; i < paramTypes.length; i++) {
-                    paramTypes[i] = String.class;
+        // OptionBuilder is not thread-safe and HBaseStorage, elephantbird 
SequenceFileConfig, etc
+        // use them in constructor. This leads to NoSuchMethodException, 
UnrecognizedOptionException etc
+        // when processor, inputs and outputs are initialized in parallel in 
Tez
+        synchronized (instantiationLock) {
+            try {
+                // Do normal instantiation
+                if (args != null && args.length > 0) {
+                    Class paramTypes[] = new Class[args.length];
+                    for (int i = 0; i < paramTypes.length; i++) {
+                        paramTypes[i] = String.class;
+                    }
+                    Constructor c = objClass.getConstructor(paramTypes);
+                    ret =  c.newInstance((Object[])args);
+                } else {
+                    ret = objClass.newInstance();
                 }
-                Constructor c = objClass.getConstructor(paramTypes);
-                ret =  c.newInstance((Object[])args);
-            } else {
-                ret = objClass.newInstance();
             }
-        }
-        catch(NoSuchMethodException nme) {
-            // Second chance. Try with var arg constructor
-            try {
-                Constructor c = objClass.getConstructor(String[].class);
-                Object[] wrappedArgs = new Object[1] ;
-                wrappedArgs[0] = args ;
-                ret =  c.newInstance(wrappedArgs);
+            catch(NoSuchMethodException nme) {
+                // Second chance. Try with var arg constructor
+                try {
+                    Constructor c = objClass.getConstructor(String[].class);
+                    Object[] wrappedArgs = new Object[1] ;
+                    wrappedArgs[0] = args ;
+                    ret =  c.newInstance(wrappedArgs);
+                }
+                catch(Throwable e){
+                    // bad luck
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("could not instantiate '");
+                    sb.append(className);
+                    sb.append("' with arguments '");
+                    sb.append(Arrays.toString(args));
+                    sb.append("'");
+                    throw new RuntimeException(sb.toString(), e);
+                }
             }
             catch(Throwable e){
                 // bad luck
@@ -760,18 +775,8 @@ public class PigContext implements Seria
                 sb.append("'");
                 throw new RuntimeException(sb.toString(), e);
             }
+            return ret;
         }
-        catch(Throwable e){
-            // bad luck
-            StringBuilder sb = new StringBuilder();
-            sb.append("could not instantiate '");
-            sb.append(className);
-            sb.append("' with arguments '");
-            sb.append(Arrays.toString(args));
-            sb.append("'");
-            throw new RuntimeException(sb.toString(), e);
-        }
-        return ret;
     }
 
     public static Object instantiateFuncFromSpec(String funcSpec)  {

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
Fri Apr  1 20:19:53 2016
@@ -27,7 +27,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Properties;
 
 import javax.management.Notification;
 import javax.management.NotificationEmitter;
@@ -36,6 +35,8 @@ import javax.management.openmbean.Compos
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
 
 /**
  * This class Tracks the tenured pool and a list of Spillable objects. When 
memory gets low, this
@@ -50,6 +51,11 @@ public class SpillableMemoryManager impl
 
     private static final Log log = 
LogFactory.getLog(SpillableMemoryManager.class);
 
+    private static final int ONE_GB = 1024 * 1024 * 1024;
+    private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 
1024;
+    private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
+    private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+
     private LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
     // References to spillables with size
     private LinkedList<SpillablePtr> spillablesSR = null;
@@ -68,13 +74,9 @@ public class SpillableMemoryManager impl
     // and between GC invocations
     private long accumulatedFreeSize = 0L;
 
-    // fraction of biggest heap for which we want to get
-    // "memory usage threshold exceeded" notifications
-    private double memoryThresholdFraction = 0.7;
-
-    // fraction of biggest heap for which we want to get
-    // "collection threshold exceeded" notifications
-    private double collectionMemoryThresholdFraction = 0.5;
+    private long memoryThresholdSize = 0L;
+
+    private long collectionThresholdSize = 0L;
 
     // log notification on usage threshold exceeded only the first time
     private boolean firstUsageThreshExceededLogged = false;
@@ -89,6 +91,8 @@ public class SpillableMemoryManager impl
 
     private volatile boolean blockRegisterOnSpill = false;
 
+    private MemoryPoolMXBean tenuredHeap;
+
     private static final SpillableMemoryManager manager = new 
SpillableMemoryManager();
 
     //@StaticDataCleanup
@@ -100,8 +104,6 @@ public class SpillableMemoryManager impl
     private SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
         List<MemoryPoolMXBean> mpbeans = 
ManagementFactory.getMemoryPoolMXBeans();
-        MemoryPoolMXBean tenuredHeap = null;
-        long tenuredHeapSize = 0;
         long totalSize = 0;
         for (MemoryPoolMXBean pool : mpbeans) {
             log.debug("Found heap (" + pool.getName() + ") of type " + 
pool.getType());
@@ -111,7 +113,6 @@ public class SpillableMemoryManager impl
                 // CMS Old Gen or "tenured" is the only heap that supports
                 // setting usage threshold.
                 if (pool.isUsageThresholdSupported()) {
-                    tenuredHeapSize = size;
                     tenuredHeap = pool;
                 }
             }
@@ -120,8 +121,39 @@ public class SpillableMemoryManager impl
         if (tenuredHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
-        log.debug("Selected heap to monitor (" +
-            tenuredHeap.getName() + ")");
+
+        configureMemoryThresholds(MEMORY_THRESHOLD_FRACTION_DEFAULT,
+                COLLECTION_THRESHOLD_FRACTION_DEFAULT,
+                UNUSED_MEMORY_THRESHOLD_DEFAULT);
+
+    }
+
+    /**
+     * Configure thresholds for memory usage/collection threshold exceeded 
notifications.
+     * Uses memoryThresholdFraction and collectionMemoryThresholdFraction to 
configure thresholds
+     * for heap sizes less than 1GB and unusedMemoryThreshold for bigger heaps.
+     *
+     * @param memoryThresholdFraction
+     *            fraction of biggest heap for which we want to get memory 
usage
+     *            threshold exceeded notifications
+     * @param collectionMemoryThresholdFraction
+     *            fraction of biggest heap for which we want to get collection
+     *            threshold exceeded notifications
+     * @param unusedMemoryThreshold
+     *            Unused memory size below which we want to get notifications
+     */
+    private void configureMemoryThresholds(double memoryThresholdFraction, 
double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+        long tenuredHeapSize = tenuredHeap.getUsage().getMax();
+        memoryThresholdSize = (long)(tenuredHeapSize * 
memoryThresholdFraction);
+        collectionThresholdSize = (long)(tenuredHeapSize * 
collectionMemoryThresholdFraction);
+        if (unusedMemoryThreshold > 0) {
+            // For a 1G heap we will be spilling around ~700MB with 300MB 
still unused with default 0.7 threshold
+            // For bigger heaps, we still want to spill when there is 300MB 
unused (plus another 50MB for buffer) and not at 70%.
+            // For eg: For 4G we want to start spilling at 3.65GB and not at 
2.8GB(70%) for better use of memory
+            long unusedThreshold = tenuredHeapSize - unusedMemoryThreshold;
+            memoryThresholdSize = Math.max(memoryThresholdSize, 
unusedThreshold);
+            collectionThresholdSize = Math.max(collectionThresholdSize, 
unusedThreshold);
+        }
 
         // we want to set both collection and usage threshold alerts to be
         // safe. In some local tests after a point only collection threshold
@@ -133,31 +165,29 @@ public class SpillableMemoryManager impl
 
         /* We set the threshold to be 50% of tenured since that is where
          * the GC starts to dominate CPU time according to Sun doc */
-        tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize * 
collectionMemoryThresholdFraction));
+        
tenuredHeap.setCollectionUsageThreshold((long)(collectionThresholdSize));
         // we set a higher threshold for usage threshold exceeded notification
         // since this is more likely to be effective sooner and we do not
         // want to be spilling too soon
-        tenuredHeap.setUsageThreshold((long)(tenuredHeapSize * 
memoryThresholdFraction));
+
+        tenuredHeap.setUsageThreshold((long)(memoryThresholdSize));
+        log.info("Selected heap (" + tenuredHeap.getName() + ")" + " of size " 
+ tenuredHeapSize
+                + " to monitor. collectionUsageThreshold = " + 
tenuredHeap.getCollectionUsageThreshold()
+                + ", usageThreshold = " + tenuredHeap.getUsageThreshold() );
     }
 
     public static SpillableMemoryManager getInstance() {
         return manager;
     }
 
-    public static void configure(Properties properties) {
-
-        try {
+    public void configure(Configuration conf) {
 
-            spillFileSizeThreshold = Long.parseLong(
-                    properties.getProperty("pig.spill.size.threshold") ) ;
-
-            gcActivationSize = Long.parseLong(
-                    properties.getProperty("pig.spill.gc.activation.size") ) ;
-        }
-        catch (NumberFormatException  nfe) {
-            throw new RuntimeException("Error while converting system 
configurations" +
-                       "spill.size.threshold, spill.gc.activation.size", nfe) ;
-        }
+        spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", 
spillFileSizeThreshold);
+        gcActivationSize = conf.getLong("pig.spill.gc.activation.size", 
gcActivationSize);
+        double memoryThresholdFraction = 
conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, 
MEMORY_THRESHOLD_FRACTION_DEFAULT);
+        double collectionThresholdFraction = 
conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, 
COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+        long unusedMemoryThreshold = 
conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, 
UNUSED_MEMORY_THRESHOLD_DEFAULT);
+        configureMemoryThresholds(memoryThresholdFraction, 
collectionThresholdFraction, unusedMemoryThreshold);
     }
 
     @Override
@@ -169,12 +199,11 @@ public class SpillableMemoryManager impl
         // used - heapmax/2 + heapmax/4
         long toFree = 0L;
         
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
-            long threshold = (long)(info.getUsage().getMax() * 
memoryThresholdFraction);
-            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            toFree = info.getUsage().getUsed() - collectionThresholdSize + 
(long)(collectionThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call- Usage threshold "
-                + info.getUsage();
+                + info.getUsage() + ", toFree = " + toFree;
             if(!firstUsageThreshExceededLogged){
                 log.info("first " + msg);
                 firstUsageThreshExceededLogged = true;
@@ -182,12 +211,11 @@ public class SpillableMemoryManager impl
                 log.debug(msg);
             }
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
-            long threshold = (long)(info.getUsage().getMax() * 
collectionMemoryThresholdFraction);
-            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            toFree = info.getUsage().getUsed() - memoryThresholdSize + 
(long)(memoryThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call - Collection threshold "
-                + info.getUsage();
+                + info.getUsage() + ", toFree = " + toFree;
             if(!firstCollectionThreshExceededLogged){
                 log.info("first " + msg);
                 firstCollectionThreshExceededLogged = true;

Modified: 
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
 Fri Apr  1 20:19:53 2016
@@ -18,18 +18,13 @@
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.expression.CastExpression;
@@ -58,7 +53,7 @@ public abstract class TypeCastInserter e
     public Transformer getNewTransformer() {
         return new TypeCastInserterTransformer();
     }
-    
+
     public class TypeCastInserterTransformer extends Transformer {
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
@@ -77,18 +72,15 @@ public abstract class TypeCastInserter e
             }
 
             // Now that we've narrowed it down to an operation that *can* have 
casts added,
-            // (because the user specified some types which might not match 
the data) let's 
+            // (because the user specified some types which might not match 
the data) let's
             // see if they're actually needed:
             LogicalSchema determinedSchema = determineSchema(op);
-            if(atLeastOneCastNeeded(determinedSchema, s)) {
-                return true;
-            }
-
             if(determinedSchema == null || determinedSchema.size() != 
s.size()) {
                 // we don't know what the data looks like, but the user has 
specified
-                // that they want a certain number of fields loaded. We'll use 
a 
-                // projection (or pruning) to make sure the columns show up 
(with NULL
-                // values) or are truncated from the right hand side of the 
input data.
+                // that they want a certain number of fields loaded.
+                return true;
+            }
+            if(atLeastOneCastNeeded(determinedSchema, s)) {
                 return true;
             }
 
@@ -98,7 +90,7 @@ public abstract class TypeCastInserter e
         private boolean atLeastOneCastNeeded(LogicalSchema determinedSchema, 
LogicalSchema s) {
             for (int i = 0; i < s.size(); i++) {
                 LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-                if (fs.type != DataType.BYTEARRAY && (determinedSchema == null 
|| (!fs.isEqual(determinedSchema.getField(i))))) {
+                if (fs.type != DataType.BYTEARRAY && 
!fs.isEqual(determinedSchema.getField(i))) {
                     // we have to cast this field from the default BYTEARRAY 
type to
                     // whatever the user specified in the 'AS' clause of the 
LOAD
                     // statement (the fs.type).
@@ -120,64 +112,36 @@ public abstract class TypeCastInserter e
                 return;
             }
 
-            if(!atLeastOneCastNeeded(determinedSchema, s) && op instanceof 
LOLoad) {
-                // we're not going to insert any casts, but we might reduce or 
increase
-                // the number of columns coming out of the LOAD. If the loader 
supports
-                // it we'll use the 'requiredColumns' functionality rather 
than bolting
-                // on a FOREACH
-                Set<Integer> required = new TreeSet<Integer>();
-                for(int i = 0; i < s.size(); ++i) {
-                    // if we know the data source's schema, pick out the 
columns we need,
-                    // otherwise take the first n
-                    int index = determinedSchema == null ? i : 
determinedSchema.findField(s.getField(i).uid);
-                    if(index >= 0)
-                        required.add(index);
-                }
-
-                // pass the indices of the fields we need to a pruner, and 
fire it off
-                // so it configures the LOLoad (and the LoadFunc it contains)
-                Map<LOLoad, Pair<Map<Integer, Set<String>>, Set<Integer>>> 
requiredMap = 
-                        new HashMap<LOLoad, 
Pair<Map<Integer,Set<String>>,Set<Integer>>>(1);
-                Pair<Map<Integer, Set<String>>, Set<Integer>> pair = 
-                        new Pair<Map<Integer,Set<String>>, Set<Integer>>(null, 
required);
-                requiredMap.put((LOLoad) op, pair);
-                new ColumnPruneVisitor(currentPlan, requiredMap , 
true).visit((LOLoad) op);
-
-                // we only want to process this node once, so mark it:
-                markCastNoNeed(op);
-                return;
-            }
-
             // For every field, build a logical plan.  If the field has a type
             // other than byte array, then the plan will be cast(project).  
Else
             // it will just be project.
             LogicalPlan innerPlan = new LogicalPlan();
-            
+
             LOForEach foreach = new LOForEach(currentPlan);
             foreach.setInnerPlan(innerPlan);
             foreach.setAlias(op.getAlias());
             // Insert the foreach into the plan and patch up the plan.
             Operator next = currentPlan.getSuccessors(op).get(0);
             currentPlan.insertBetween(op, foreach, next);
-            
+
             List<LogicalExpressionPlan> exps = new 
ArrayList<LogicalExpressionPlan>();
             LOGenerate gen = new LOGenerate(innerPlan, exps, new 
boolean[s.size()]);
             innerPlan.add(gen);
 
             for (int i = 0; i < s.size(); i++) {
                 LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-                
+
                 LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
-                innerPlan.add(innerLoad);          
+                innerPlan.add(innerLoad);
                 innerPlan.connect(innerLoad, gen);
-                
+
                 LogicalExpressionPlan exp = new LogicalExpressionPlan();
-                
+
                 ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
                 exp.add(prj);
-                
+
                 if (fs.type != DataType.BYTEARRAY && (determinedSchema == null 
|| (!fs.isEqual(determinedSchema.getField(i))))) {
-                    // Either no schema was determined by loader OR the type 
+                    // Either no schema was determined by loader OR the type
                     // from the "determinedSchema" is different
                     // from the type specified - so we need to cast
                     CastExpression cast = new CastExpression(exp, prj, new 
LogicalSchema.LogicalFieldSchema(fs));
@@ -187,7 +151,7 @@ public abstract class TypeCastInserter e
                         loadFuncSpec = 
((LOLoad)op).getFileSpec().getFuncSpec();
                     } else if (op instanceof LOStream) {
                         StreamingCommand command = 
((LOStream)op).getStreamingCommand();
-                        HandleSpec streamOutputSpec = command.getOutputSpec(); 
+                        HandleSpec streamOutputSpec = command.getOutputSpec();
                         loadFuncSpec = new 
FuncSpec(streamOutputSpec.getSpec());
                     } else {
                         String msg = "TypeCastInserter invoked with an invalid 
operator class name: " + innerPlan.getClass().getSimpleName();
@@ -199,7 +163,7 @@ public abstract class TypeCastInserter e
             }
             markCastInserted(op);
         }
-        
+
         @Override
         public OperatorPlan reportChanges() {
             return currentPlan;

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj 
Fri Apr  1 20:19:53 2016
@@ -355,7 +355,20 @@ TOKEN_MGR_DECLS : {
 <SCHEMA_DEFINITION> MORE :
 {
     <"("> {tupleSchemaLevel++;}
-|   <")"> {tupleSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel 
== 0)) SwitchTo(prevState); }
+|   <")">
+    {
+        if ((tupleSchemaLevel == 0) && (bagSchemaLevel == 0)) {
+        // This means parenthesis is not from this schema_def.
+        // Putting back ")" although others do not check parenthesis at this 
time.
+        // This is a bandaid workaround for the issue with inline-op which
+        // also uses parenthesis.
+        // Real fix would be to move out of using this javacc parser. 
(PIG-2597)
+            input_stream.backup(1);
+            image.deleteCharAt(image.length()-1);
+            SwitchTo(prevState);
+        }
+        tupleSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel == 
0)) SwitchTo(prevState);
+    }
 |   <"{"> {bagSchemaLevel++;}
 |   <"}"> {bagSchemaLevel--; if ((tupleSchemaLevel == 0) && (bagSchemaLevel == 
0)) SwitchTo(prevState); }
 |      <("," | ";" )>

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java 
Fri Apr  1 20:19:53 2016
@@ -23,6 +23,7 @@ import static org.apache.pig.tools.pigst
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.pig.PigCounters;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -286,7 +288,14 @@ public class TezVertexStats extends JobS
             multiStoreCounters.putAll(msGroup);
         }
 
+        // Split followed by union will have multiple stores writing to same 
location
+        Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
         for (POStore sto : stores) {
+            POStoreTez store = (POStoreTez) sto;
+            uniqueOutputs.put(store.getOutputKey(), store);
+        }
+
+        for (POStore sto : uniqueOutputs.values()) {
             if (sto.isTmpStore()) {
                 continue;
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Apr  1 
20:19:53 2016
@@ -3209,7 +3209,9 @@ public class TestBuiltin {
         String inputFileName = "testUniqueID.txt";
         Util.createInputFile(cluster, inputFileName, new String[]
             {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
-        PigServer pigServer = new PigServer(cluster.getExecType(), 
cluster.getProperties());
+        Properties copyproperties = new Properties();
+        copyproperties.putAll(cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), 
copyproperties);
         
pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", 
"10");
         
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", 
"true");
         pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
@@ -3255,6 +3257,7 @@ public class TestBuiltin {
             assertEquals(iter.next().get(1), "0-2");
             assertEquals(iter.next().get(1), "0-3");
         }
+        Util.deleteFile(cluster, inputFileName);
     }
 
     @Test
@@ -3263,7 +3266,10 @@ public class TestBuiltin {
         String inputFileName = "testRANDOM.txt";
         Util.createInputFile(cluster, inputFileName, new String[]
             {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
-        PigServer pigServer = new PigServer(cluster.getExecType(), 
cluster.getProperties());
+
+        Properties copyproperties = new Properties();
+        copyproperties.putAll(cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), 
copyproperties);
         // running with two mappers
         
pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", 
"10");
         
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", 
"true");
@@ -3290,6 +3296,7 @@ public class TestBuiltin {
         for( int i = 0; i < 5; i++ ){
             assertNotEquals(mapper1[i], mapper2[i], 0.0001);
         }
+        Util.deleteFile(cluster, inputFileName);
     }
 
 



Reply via email to