Author: zly
Date: Wed Apr 12 02:20:20 2017
New Revision: 1791060

URL: http://svn.apache.org/viewvc?rev=1791060&view=rev
Log:
PIG-5215:Merge changes from review board to spark branch(Liyun)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
Modified:
    pig/branches/spark/CHANGES.txt
    pig/branches/spark/bin/pig
    pig/branches/spark/build.xml
    pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml
    pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
    pig/branches/spark/src/org/apache/pig/PigConfiguration.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
    pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
    
pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java
    
pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
    pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java
    
pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
    pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
    pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
    pig/branches/spark/test/org/apache/pig/test/TestSchema.java
    pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/spark/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/CHANGES.txt?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/CHANGES.txt (original)
+++ pig/branches/spark/CHANGES.txt Wed Apr 12 02:20:20 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-5110: Removing schema alias and :: coming from parent relation (szita via 
rohini)
+
 PIG-5085: Support FLATTEN of maps (szita via rohini)
 
 PIG-5126. Add doc about pig in zeppelin (zjffdu)
@@ -87,6 +89,10 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5173: Script with multiple splits fails with Invalid dag containing 0 
vertices (rohini)
+
+PIG-5159: Fix Pig not saving grunt history (szita via rohini)
+
 PIG-5127: Test fail when running test-core-mrtez (daijy)
 
 PIG-5083: CombinerPackager and LitePackager should not materialize bags 
(rohini)

Modified: pig/branches/spark/bin/pig
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/bin/pig?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/bin/pig (original)
+++ pig/branches/spark/bin/pig Wed Apr 12 02:20:20 2017
@@ -57,26 +57,19 @@ remaining=()
 includeHCatalog="";
 addJarString=-Dpig.additional.jars.uris\=;
 additionalJars="";
-preArg="";
-isSparkMode=1;
-isSparkLocalMode=1;
-
-#verifyMode(preArg, execType,expectExecType): check whether $preArg is "-x" or 
"-exectype", if yes 
-#continue to verify whether $execType is equals to $expectExecType
-function verifyMode(){
-  preArg=$1
-  execType=$2
-  expectExecType=$3
-  if [[ "$preArg" == "-x" || "$preArg" == "-exectype" ]]; then
+prevArgExecType=false;
+isSparkMode=false;
+isSparkLocalMode=false;
+
+#verify the execType is SPARK or SPARK_LOCAL or not
+function processExecType(){
+    execType=$1
     execTypeUpperCase=$(echo $execType |tr [a-z] [A-Z])
-    if [[ "$execTypeUpperCase" == "$expectExecType" ]]; then
-        echo 0
-    else 
-        echo 1
+    if [[ "$execTypeUpperCase" == "SPARK" ]]; then
+       isSparkMode=true
+    elif [[ "$execTypeUpperCase" == "SPARK_LOCAL" ]]; then
+       isSparkLocalMode=true
     fi
-  else
-    echo 1
-  fi
 }
 
 # filter command line parameter
@@ -92,16 +85,16 @@ for f in "$@"; do
         includeHCatalog=true;
       elif [[ "$includeHCatalog" == "true" && $f == $addJarString* ]]; then
         additionalJars=`echo $f | sed s/$addJarString//`
-      elif [[ $(verifyMode $preArg $f "SPARK") -eq 0 ]]; then
-        isSparkMode=0 
+      elif [[ "$f" == "-x" || "$f" == "-exectype" ]]; then
+        prevArgExecType=true;
         remaining[${#remaining[@]}]="$f"
-      elif [[ $(verifyMode $preArg $f "SPARK_LOCAL") -eq 0 ]]; then
-        isSparkLocalMode=0
+      elif [[ "$prevArgExecType" == "true" ]]; then
+        prevArgExecType=false;
+        processExecType $f
         remaining[${#remaining[@]}]="$f"
       else
         remaining[${#remaining[@]}]="$f"
      fi
-     preArg=$f
 done
 
 # resolve links - $0 may be a softlink
@@ -393,34 +386,34 @@ fi
 
 ################# ADDING SPARK DEPENDENCIES ##################
 # For spark_local mode:
-if [ $isSparkLocalMode -eq 0 ]; then
+if [ "$isSparkLocalMode" == "true" ]; then
 #SPARK_MASTER is forced to be "local" in spark_local mode
         SPARK_MASTER="local"
-       for f in $PIG_HOME/lib/spark/*.jar; do
-               CLASSPATH=${CLASSPATH}:$f;
-       done
+    for f in $PIG_HOME/lib/spark/*.jar; do
+            CLASSPATH=${CLASSPATH}:$f;
+    done
 fi
 
 # For spark mode:
 # Please specify SPARK_HOME first so that we can locate 
$SPARK_HOME/lib/spark-assembly*.jar,
 # we will add spark-assembly*.jar to the classpath.
-if [ $isSparkMode -eq 0 ]; then
-       if [ -z "$SPARK_HOME" ]; then
-          echo "Error: SPARK_HOME is not set!"  
-          exit 1
-       fi
-       
-       # Please specify SPARK_JAR which is the hdfs path of 
spark-assembly*.jar to allow YARN to cache spark-assembly*.jar on nodes so that 
it doesn't need to be distributed each time an application runs.
-       if [ -z "$SPARK_JAR" ]; then
-          echo "Error: SPARK_JAR is not set, SPARK_JAR stands for the hdfs 
location of spark-assembly*.jar. This allows YARN to cache spark-assembly*.jar 
on nodes so that it doesn't need to be distributed each time an application 
runs."  
-          exit 1
-       fi
-       
-       if [ -n "$SPARK_HOME" ]; then
-           echo "Using Spark Home: " ${SPARK_HOME}
-           SPARK_ASSEMBLY_JAR=`ls ${SPARK_HOME}/lib/spark-assembly*`
-           CLASSPATH=${CLASSPATH}:$SPARK_ASSEMBLY_JAR
-       fi
+if [ "$isSparkMode"  == "true" ]; then
+    if [ -z "$SPARK_HOME" ]; then
+       echo "Error: SPARK_HOME is not set!"
+       exit 1
+    fi
+
+    # Please specify SPARK_JAR which is the hdfs path of spark-assembly*.jar 
to allow YARN to cache spark-assembly*.jar on nodes so that it doesn't need to 
be distributed each time an application runs.
+    if [ -z "$SPARK_JAR" ]; then
+       echo "Error: SPARK_JAR is not set, SPARK_JAR stands for the hdfs 
location of spark-assembly*.jar. This allows YARN to cache spark-assembly*.jar 
on nodes so that it doesn't need to be distributed each time an application 
runs."
+       exit 1
+    fi
+
+    if [ -n "$SPARK_HOME" ]; then
+        echo "Using Spark Home: " ${SPARK_HOME}
+        SPARK_ASSEMBLY_JAR=`ls ${SPARK_HOME}/lib/spark-assembly*`
+        CLASSPATH=${CLASSPATH}:$SPARK_ASSEMBLY_JAR
+    fi
 fi
 ################# ADDING SPARK DEPENDENCIES ##################
 

Modified: pig/branches/spark/build.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Wed Apr 12 02:20:20 2017
@@ -260,8 +260,10 @@
     <property name="build.dir" location="build" />
     <property name="build.ivy.dir" location="${build.dir}/ivy" />
     <property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
-    <property name="build.ivy.spark.lib.dir" 
location="${build.ivy.dir}/lib/spark" />
     <property name="ivy.lib.dir" 
location="${build.ivy.lib.dir}/${ant.project.name}"/>
+    <!-- if we set ivy.lib.dir.spark as "${ivy.lib.dir}/spark", 
org.apache.pig.test.TestRegisteredJarVisibility.testRegisterJarOverridePigJarPackages
 will fail  -->
+    <!--<property name="ivy.lib.dir.spark" location="${ivy.lib.dir}/spark" 
/>-->
+    <property name="ivy.lib.dir.spark" location="${build.ivy.dir}/lib/spark" />
     <property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
     <property name="build.ivy.maven.dir" location="${build.ivy.dir}/maven" />
     <property name="pom.xml" location="${build.ivy.maven.dir}/pom.xml"/>
@@ -331,7 +333,7 @@
             <fileset dir="${ivy.lib.dir}">
                 <include name="**.*jar"/>
             </fileset>
-            <fileset dir="${build.ivy.spark.lib.dir}">
+            <fileset dir="${ivy.lib.dir.spark}">
                 <include name="**.*jar"/>
             </fileset>
         </path>
@@ -364,7 +366,7 @@
     <path id="classpath">
         <fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
         <fileset dir="${ivy.lib.dir}" includes="*.jar"/>
-        <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
+        <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/>
     </path>
 
     <!-- javadoc-classpath -->
@@ -734,7 +736,7 @@
     <target name="copySparkDependencies">
         <mkdir dir="${spark.lib.dir}" />
         <copy todir="${spark.lib.dir}">
-            <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
+            <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/>
         </copy>
     </target>
     
@@ -876,7 +878,7 @@
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-tez.failed"/>
         <fail if="test-tez.failed">Tests failed!</fail>
     </target>
-       
+
     <target name="test-spark" 
depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download"
 description="Run Spark unit tests in Spark cluster-local mode">
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-spark.failed"/>
         <fail if="test-spark.failed">Tests failed!</fail>
@@ -1003,9 +1005,9 @@
         <ant dir="${test.e2e.dir}" target="test-tez"/>
     </target>
 
-       <target name="test-e2e-spark" depends="jar, piggybank" description="run 
end-to-end tests in spark mode">
-               <ant dir="${test.e2e.dir}" target="test-spark"/>
-       </target>
+    <target name="test-e2e-spark" depends="jar, piggybank" description="run 
end-to-end tests in spark mode">
+            <ant dir="${test.e2e.dir}" target="test-spark"/>
+    </target>
 
     <target name="test-e2e-deploy" depends="jar" description="deploy 
end-to-end tests to existing cluster">
         <ant dir="${test.e2e.dir}" target="deploy"/>
@@ -1654,7 +1656,7 @@
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" 
log="${loglevel}"
                  
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" 
log="${loglevel}"
-                 
pattern="${build.ivy.spark.lib.dir}/[artifact]-[revision](-[classifier]).[ext]" 
conf="spark"/>
+                 
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" 
conf="spark"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
      </target>
 

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml 
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml Wed 
Apr 12 02:20:20 2017
@@ -5409,7 +5409,9 @@ DUMP X;
 <section id="disambiguate">
 <title>Disambiguate Operator</title>
 
-<p>Use the disambiguate operator ( :: ) to identify field names after JOIN, 
COGROUP, CROSS, or FLATTEN operators.</p>
+<p>After JOIN, COGROUP, CROSS, or FLATTEN operations, the field names have the 
orginial alias and the disambiguate
+   operator ( :: ) prepended in the schema. The disambiguate operator is used 
to identify field names in case there
+   is a ambiguity.</p>
 
 <p>In this example, to disambiguate y,  use A::y or B::y.  In cases where 
there is no ambiguity, such as z, the :: is not necessary but is still 
supported.</p>
 
@@ -5417,8 +5419,14 @@ DUMP X;
 A = load 'data1' as (x, y);
 B = load 'data2' as (x, y, z);
 C = join A by x, B by x;
-D = foreach C generate y; -- which y?
+D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to 
A::y or B::y
 </source>
+<p> In cases where the schema is stored as part of the StoreFunc like 
PigStorage, JsonStorage, AvroStorage or OrcStorage,
+   users generally have to use an extra FOREACH before STORE to rename the 
field names and remove the disambiguate
+   operator from the names. To automatically remove the disambiguate operator 
from the schema for the STORE operation,
+   the <i>pig.store.schema.disambiguate</i> Pig property can be set to 
"false". It is the responsibility of the user
+   to make sure that there is no conflict in the field names when using this 
setting.
+</p>
 </section>
 
     <!-- =================================================================== 
-->  
@@ -5444,7 +5452,7 @@ D = foreach C generate y; -- which y?
       to bags. For example, if we apply the expression GENERATE $0, 
FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
       we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
    </p>
-   
+
    <p>Also note that the flatten of empty bag will result in that row being 
discarded; no output is generated. 
    (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
    
@@ -6537,7 +6545,7 @@ B = FOREACH A GENERATE a, FLATTEN(m);
 C = FILTER B by m::value == 5;
 ……
 </source>
-   
+
    </section>
    
    <section id="nestedblock">

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml 
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Wed 
Apr 12 02:20:20 2017
@@ -26,45 +26,45 @@
 
 <!-- SET UP PIG -->
  <section>
-               <title>Pig Setup</title>
-       
+        <title>Pig Setup</title>
+    
 <!-- ++++++++++++++++++++++++++++++++++ -->
  <section id="req">
  <title>Requirements</title>
  <p><strong>Mandatory</strong></p>
       <p>Unix and Windows users need the following:</p>
-               <ul>
-                 <li> <strong>Hadoop 2.X</strong> - <a 
href="http://hadoop.apache.org/common/releases.html";>http://hadoop.apache.org/common/releases.html</a>
 (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to 
point to the directory where you have installed Hadoop. If you do not set 
HADOOP_HOME, by default Pig will run with the embedded version, currently 
Hadoop 2.7.3.)</li>
-                 <li> <strong>Java 1.7</strong> - <a 
href="http://java.sun.com/javase/downloads/index.jsp";>http://java.sun.com/javase/downloads/index.jsp</a>
 (set JAVA_HOME to the root of your Java installation)</li>       
-               </ul>
-               <p></p>
+        <ul>
+          <li> <strong>Hadoop 2.X</strong> - <a 
href="http://hadoop.apache.org/common/releases.html";>http://hadoop.apache.org/common/releases.html</a>
 (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to 
point to the directory where you have installed Hadoop. If you do not set 
HADOOP_HOME, by default Pig will run with the embedded version, currently 
Hadoop 2.7.3.)</li>
+          <li> <strong>Java 1.7</strong> - <a 
href="http://java.sun.com/javase/downloads/index.jsp";>http://java.sun.com/javase/downloads/index.jsp</a>
 (set JAVA_HOME to the root of your Java installation)</li>    
+        </ul>
+        <p></p>
  <p><strong>Optional</strong></p>
-               <ul>
+         <ul>
           <li> <strong>Python 2.7</strong> - <a 
href="http://jython.org/downloads.html";>https://www.python.org</a> (when using 
Streaming Python UDFs) </li>
           <li> <strong>Ant 1.8</strong> - <a 
href="http://ant.apache.org/";>http://ant.apache.org/</a> (for builds) </li>
-               </ul>
+        </ul>
  
   </section>         
    
 <!-- ++++++++++++++++++++++++++++++++++ -->        
  <section id="download">
  <title>Download Pig</title>
-       <p>To get a Pig distribution, do the following:</p>
-       
-       <ol>
-       <li>Download a recent stable release from one of the Apache Download 
Mirrors 
-       (see <a href="http://hadoop.apache.org/pig/releases.html";> Pig 
Releases</a>).</li>
-       
+    <p>To get a Pig distribution, do the following:</p>
+    
+    <ol>
+    <li>Download a recent stable release from one of the Apache Download 
Mirrors 
+    (see <a href="http://hadoop.apache.org/pig/releases.html";> Pig 
Releases</a>).</li>
+    
     <li>Unpack the downloaded Pig distribution, and then note the following:
-           <ul>
-           <li>The Pig script file, pig, is located in the bin directory 
(/pig-n.n.n/bin/pig). 
-           The Pig environment variables are described in the Pig script 
file.</li>
-           <li>The Pig properties file, pig.properties, is located in the conf 
directory (/pig-n.n.n/conf/pig.properties). 
-           You can specify an alternate location using the PIG_CONF_DIR 
environment variable.</li>
-       </ul>   
-       </li>
-       <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv 
(tcsh,csh). For example: <br></br>
-       <code>$ export PATH=/&lt;my-path-to-pig&gt;/pig-n.n.n/bin:$PATH</code>
+        <ul>
+        <li>The Pig script file, pig, is located in the bin directory 
(/pig-n.n.n/bin/pig). 
+        The Pig environment variables are described in the Pig script 
file.</li>
+        <li>The Pig properties file, pig.properties, is located in the conf 
directory (/pig-n.n.n/conf/pig.properties). 
+        You can specify an alternate location using the PIG_CONF_DIR 
environment variable.</li>
+    </ul>    
+    </li>
+    <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv 
(tcsh,csh). For example: <br></br>
+    <code>$ export PATH=/&lt;my-path-to-pig&gt;/pig-n.n.n/bin:$PATH</code>
 </li>
 <li>
 Test the Pig installation with this simple command: <code>$ pig -help</code>
@@ -78,10 +78,10 @@ Test the Pig installation with this simp
 <title>Build Pig</title>
       <p>To build pig, do the following:</p>
      <ol>
-         <li> Check out the Pig code from SVN: <code>svn co 
http://svn.apache.org/repos/asf/pig/trunk</code> </li>
-         <li> Build the code from the top directory: <code>ant</code> <br></br>
-         If the build is successful, you should see the pig.jar file created 
in that directory. </li>  
-         <li> Validate the pig.jar  by running a unit test: <code>ant 
test</code></li>
+      <li> Check out the Pig code from SVN: <code>svn co 
http://svn.apache.org/repos/asf/pig/trunk</code> </li>
+      <li> Build the code from the top directory: <code>ant</code> <br></br>
+      If the build is successful, you should see the pig.jar file created in 
that directory. </li>    
+      <li> Validate the pig.jar  by running a unit test: <code>ant 
test</code></li>
      </ol>
  </section>
 </section>
@@ -90,37 +90,37 @@ Test the Pig installation with this simp
     
    <!-- RUNNING PIG  -->
    <section id="run">
-       <title>Running Pig </title>
-       <p>You can run Pig (execute Pig Latin statements and Pig commands) 
using various modes.</p>
-       <table>
-       <tr>
-       <td></td>
+    <title>Running Pig </title>
+    <p>You can run Pig (execute Pig Latin statements and Pig commands) using 
various modes.</p>
+    <table>
+    <tr>
+    <td></td>
     <td><strong>Local Mode</strong></td>
     <td><strong>Tez Local Mode</strong></td>
     <td><strong>Spark Local Mode</strong></td>
     <td><strong>Mapreduce Mode</strong></td>
     <td><strong>Tez Mode</strong></td>
     <td><strong>Spark Mode</strong></td>
-       </tr>
-       <tr>
-       <td><strong>Interactive Mode </strong></td>
+    </tr>
+    <tr>
+    <td><strong>Interactive Mode </strong></td>
     <td>yes</td>
     <td>experimental</td>
     <td>yes</td>
     <td>yes</td>
-       </tr>
-       <tr>
-       <td><strong>Batch Mode</strong> </td>
+    </tr>
+    <tr>
+    <td><strong>Batch Mode</strong> </td>
     <td>yes</td>
     <td>experimental</td>
     <td>yes</td>
     <td>yes</td>
-       </tr>
-       </table>
+    </tr>
+    </table>
 
-       <!-- ++++++++++++++++++++++++++++++++++ -->
-          <section id="execution-modes">
-       <title>Execution Modes</title>
+    <!-- ++++++++++++++++++++++++++++++++++ -->
+       <section id="execution-modes">
+    <title>Execution Modes</title>
 <p>Pig has six execution modes or exectypes: </p>
 <ul>
 <li><strong>Local Mode</strong> - To run Pig in local mode, you need access to 
a single machine; all files are installed and run using your local host and 
file system. Specify local mode using the -x flag (pig -x local).
@@ -129,13 +129,13 @@ Test the Pig installation with this simp
 <p><strong>Note:</strong> Tez local mode is experimental. There are some 
queries which just error out on bigger data in local mode.</p>
 </li>
 <li><strong>Spark Local Mode</strong> - To run Pig in spark local mode. It is 
similar to local mode, except internally Pig will invoke spark runtime engine. 
Specify Spark local mode using the -x flag (pig -x spark_local).
-<p><strong>Note:</strong> Spark local mode is experimental. There are some 
queries which just error out on bigge data in local mode.</p>
+<p><strong>Note:</strong> Spark local mode is experimental. There are some 
queries which just error out on bigger data in local mode.</p>
 </li>
 <li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need 
access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default 
mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR 
pig -x mapreduce).
 </li>
 <li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a 
Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x 
tez).
 </li>
-<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to 
a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using 
the -x flag (-x spark). In Spark execution mode, it is necessary to set 
env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - 
yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - 
spark cluster. For more information refer to spark documentation on Master 
Urls, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run 
on Spark can take advantage of the <a 
href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation";>dynamic
 allocation</a> feature. The feature can be enabled by simply enabling 
<em>spark.dynamicAllocation.enabled</em>. Refer to spark <a 
href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation";>configuration</a>
 for additional configuration details. In general all properties in the pig 
script prefixed with
  <em>spark.</em> are copied to the Spark Application Configuration. Please 
note that Yarn auxillary service need to be enabled on Spark for this to work. 
See Spark documentation for additional details.
+<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to 
a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using 
the -x flag (-x spark). In Spark execution mode, it is necessary to set 
env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - 
yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - 
spark cluster. For more information refer to spark documentation on Master 
URLs, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run 
on Spark can take advantage of the <a 
href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation";>dynamic
 allocation</a> feature. The feature can be enabled by simply enabling 
<em>spark.dynamicAllocation.enabled</em>. Refer to spark <a 
href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation";>configuration</a>
 for additional configuration details. In general all properties in the pig 
script prefixed with
  <em>spark.</em> are copied to the Spark Application Configuration. Please 
note that Yarn auxillary service need to be enabled on Spark for this to work. 
See Spark documentation for additional details.
 </li>
 </ul>
 <p></p>
@@ -289,16 +289,16 @@ $ pig -x spark id.pig
 
    <!-- PIG SCRIPTS -->
    <section id="pig-scripts">
-       <title>Pig Scripts</title>
-       
-<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single 
file. While not required, it is good practice to identify the file using the 
*.pig extension.</p>        
-       
+    <title>Pig Scripts</title>
+
+<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single 
file. While not required, it is good practice to identify the file using the 
*.pig extension.</p>
+
 <p>You can run Pig scripts from the command line and from the Grunt shell
 (see the <a href="cmds.html#run">run</a> and <a href="cmds.html#exec">exec</a> 
commands). </p>
-       
+
 <p>Pig scripts allow you to pass values to parameters using <a 
href="cont.html#Parameter-Sub">parameter substitution</a>. </p>
 
-<!-- +++++++++++++++++++++++++++++++++++++++++++ -->   
+<!-- +++++++++++++++++++++++++++++++++++++++++++ -->    
    <p id="comments"><strong>Comments in Scripts</strong></p>
    
    <p>You can include comments in Pig scripts:</p>
@@ -320,8 +320,8 @@ A = LOAD 'student' USING PigStorage() AS
 B = FOREACH A GENERATE name;  -- transforming data
 DUMP B;  -- retrieving results
 </source>   
-       
-<!-- +++++++++++++++++++++++++++++++++++++++++++ -->           
+    
+<!-- +++++++++++++++++++++++++++++++++++++++++++ -->        
 
 <p id="dfs"><strong>Scripts and Distributed File Systems</strong></p>
 
@@ -329,7 +329,7 @@ DUMP B;  -- retrieving results
 <source>
 $ pig hdfs://nn.mydomain.com:9020/myscripts/script.pig
 </source> 
-</section>     
+</section>    
 </section>
 </section>
 
@@ -391,7 +391,7 @@ hadoop.security.krb5.keytab=/home/niels/
     
    <!-- PIG LATIN STATEMENTS -->
    <section id="pl-statements">
-       <title>Pig Latin Statements</title>     
+    <title>Pig Latin Statements</title>    
    <p>Pig Latin statements are the basic constructs you use to process data 
using Pig. 
    A Pig Latin statement is an operator that takes a <a 
href="basic.html#relations">relation</a> as input and produces another relation 
as output. 
    (This definition applies to all Pig Latin operators except LOAD and STORE 
which read data from and write data to the file system.) 
@@ -532,20 +532,20 @@ However, in a production environment you
 <p></p>
 <p id="pig-properties">To specify Pig properties use one of these 
mechanisms:</p>
 <ul>
-       <li>The pig.properties file (add the directory that contains the 
pig.properties file to the classpath)</li>
-       <li>The -D and a Pig property in PIG_OPTS environment variable (export 
PIG_OPTS=-Dpig.tmpfilecompression=true)</li>
-       <li>The -P command line option and a properties file (pig -P 
mypig.properties)</li>
-       <li>The <a href="cmds.html#set">set</a> command (set 
pig.exec.nocombiner true)</li>
+    <li>The pig.properties file (add the directory that contains the 
pig.properties file to the classpath)</li>
+    <li>The -D and a Pig property in PIG_OPTS environment variable (export 
PIG_OPTS=-Dpig.tmpfilecompression=true)</li>
+    <li>The -P command line option and a properties file (pig -P 
mypig.properties)</li>
+    <li>The <a href="cmds.html#set">set</a> command (set pig.exec.nocombiner 
true)</li>
 </ul>
 <p><strong>Note:</strong> The properties file uses standard Java property file 
format.</p>
 <p>The following precedence order is supported: pig.properties &lt; -D Pig 
property &lt; -P properties file &lt; set command. This means that if the same 
property is provided using the –D command line option as well as the –P 
command line option (properties file), the value of the property in the 
properties file will take precedence.</p>
 
 <p id="hadoop-properties">To specify Hadoop properties you can use the same 
mechanisms:</p>
 <ul>
-       <li>Hadoop configuration files (include 
pig-cluster-hadoop-site.xml)</li>
-       <li>The -D and a Hadoop property in PIG_OPTS environment variable 
(export PIG_OPTS=–Dmapreduce.task.profile=true) </li>
-       <li>The -P command line option and a property file (pig -P 
property_file)</li>
-       <li>The <a href="cmds.html#set">set</a> command (set 
mapred.map.tasks.speculative.execution false)</li>
+    <li>Hadoop configuration files (include pig-cluster-hadoop-site.xml)</li>
+    <li>The -D and a Hadoop property in PIG_OPTS environment variable (export 
PIG_OPTS=–Dmapreduce.task.profile=true) </li>
+    <li>The -P command line option and a property file (pig -P 
property_file)</li>
+    <li>The <a href="cmds.html#set">set</a> command (set 
mapred.map.tasks.speculative.execution false)</li>
 </ul>
 <p></p>
 <p>The same precedence holds: Hadoop configuration files &lt; -D Hadoop 
property &lt; -P properties_file &lt; set command.</p>
@@ -577,8 +577,8 @@ $ export PIG_HOME=/&lt;my-path-to-pig&gt
 <li>Create the pigtutorial.tar.gz file:
 <ul>
     <li>Move to the Pig tutorial directory (.../pig-0.16.0/tutorial).</li>
-       <li>Run the "ant" command from the tutorial directory. This will create 
the pigtutorial.tar.gz file.
-       </li>
+    <li>Run the "ant" command from the tutorial directory. This will create 
the pigtutorial.tar.gz file.
+    </li>
 </ul>
 
 </li>
@@ -646,7 +646,7 @@ export PIG_CLASSPATH=/mycluster/conf
 <source>
 export PIG_CLASSPATH=/mycluster/conf:/tez/conf
 </source>
-<p>If you are using Spark, you will also need to specify SPARK_HOME and 
specify SPARK_JAR which is the hdfs location where you upload 
$SPARK_HOME/lib/spark-assembly*.jar:</p>
+<p>If you are using Spark, you will also need to specify SPARK_HOME and 
specify SPARK_JAR which is the hdfs location where you uploaded 
$SPARK_HOME/lib/spark-assembly*.jar:</p>
 <source>export SPARK_HOME=/mysparkhome/; export 
SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar</source>
 <p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 
3rd party dependencies or resource files a pig script may require. If there is 
also a need to make the added entries take the highest precedence in the Pig 
JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to 
any value, such as 'true' (and unset the env-var to disable).</p></li>
 <li>Set the HADOOP_CONF_DIR environment variable to the location of the 
cluster configuration directory:

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Wed Apr 12 
02:20:20 2017
@@ -506,6 +506,13 @@ public class PigConfiguration {
      */
     public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = 
"pig.tez.configure.am.memory";
 
+    /**
+     * If set to false, automatic schema disambiguation gets disabled i.e. 
group::name will be just name
+     */
+    public static final String PIG_STORE_SCHEMA_DISAMBIGUATE = 
"pig.store.schema.disambiguate";
+
+    public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 Wed Apr 12 02:20:20 2017
@@ -35,6 +35,6 @@ public class AccumulatorOptimizer extend
     }
 
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan);
+        AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan, 
mr.reducePlan.getRoots());
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
 Wed Apr 12 02:20:20 2017
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
@@ -69,7 +68,7 @@ class NoopFilterRemover extends MROpPlan
         public void visit() throws VisitorException {
             super.visit();
             for (Pair<POFilter, PhysicalPlan> pair: removalQ) {
-                removeFilter(pair.first, pair.second);
+                NoopFilterRemoverUtil.removeFilter(pair.first, pair.second);
             }
             removalQ.clear();
         }
@@ -91,23 +90,5 @@ class NoopFilterRemover extends MROpPlan
                 }
             }
         }
-        
-        private void removeFilter(POFilter filter, PhysicalPlan plan) {
-            if (plan.size() > 1) {
-                try {
-                    List<PhysicalOperator> fInputs = filter.getInputs();
-                    List<PhysicalOperator> sucs = plan.getSuccessors(filter);
-
-                    plan.removeAndReconnect(filter);
-                    if(sucs!=null && sucs.size()!=0){
-                        for (PhysicalOperator suc : sucs) {
-                            suc.setInputs(fInputs);
-                        }
-                    }
-                } catch (PlanException pe) {
-                    log.info("Couldn't remove a filter in optimizer: 
"+pe.getMessage());
-                }
-            }
-        }
     }
 }

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java?rev=1791060&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
 Wed Apr 12 02:20:20 2017
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.impl.plan.PlanException;
+
+import java.util.List;
+
+public class NoopFilterRemoverUtil {
+    private static Log log = LogFactory.getLog(NoopFilterRemoverUtil.class);
+
+    public static void removeFilter(POFilter filter, PhysicalPlan plan) {
+        if (plan.size() > 1) {
+            try {
+                List<PhysicalOperator> fInputs = filter.getInputs();
+                List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
+                plan.removeAndReconnect(filter);
+                if(sucs!=null && sucs.size()!=0){
+                    for (PhysicalOperator suc : sucs) {
+                        suc.setInputs(fInputs);
+                    }
+                }
+            } catch (PlanException pe) {
+                log.info("Couldn't remove a filter in optimizer: 
"+pe.getMessage());
+            }
+        }
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
 Wed Apr 12 02:20:20 2017
@@ -52,7 +52,8 @@ public class SecondaryKeyOptimizerMR ext
         if (mr.getCustomPartitioner()!=null)
             return;
 
-        info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, 
mr.reducePlan);
+        SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new 
SecondaryKeyOptimizerUtil();
+        info = secondaryKeyOptUtil.applySecondaryKeySort(mr.mapPlan, 
mr.reducePlan);
         if (info != null && info.isUseSecondaryKey()) {
             mr.setUseSecondaryKey(true);
             mr.setSecondarySortOrder(info.getSecondarySortOrder());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Wed Apr 12 02:20:20 2017
@@ -74,7 +74,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -369,8 +368,4 @@ public class PhyPlanVisitor extends Plan
     public void visitBroadcastSpark(POBroadcastSpark poBroadcastSpark) {
     }
 
-       public void visitPoissonSampleSpark(
-                       POPoissonSampleSpark poPoissonSampleSpark) {
-               // TODO Auto-generated method stub
-       }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 Wed Apr 12 02:20:20 2017
@@ -71,7 +71,7 @@ public class POCollectedGroup extends Ph
     private transient boolean useDefaultBag;
 
     //For Spark
-    private boolean endOfInput = false;
+    private transient boolean endOfInput = false;
     public boolean isEndOfInput() {
         return endOfInput;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 Wed Apr 12 02:20:20 2017
@@ -123,7 +123,7 @@ public class POMergeCogroup extends Phys
     }
 
     //For Spark
-    private boolean endOfInput = false;
+    private transient boolean endOfInput = false;
     public boolean isEndOfInput() {
         return endOfInput;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Wed Apr 12 02:20:20 2017
@@ -136,7 +136,7 @@ public class POMergeJoin extends Physica
     // The old flag parentPlan.endOfAllInput doesn't work in spark mode, 
because it is shared
     // between operators in the same plan, so it could be set by preceding 
operators even
     // current operator does not reach at its end. (see PIG-4876)
-    private boolean endOfInput = false;
+    private transient boolean endOfInput = false;
     public boolean isEndOfInput() {
         return endOfInput;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 Wed Apr 12 02:20:20 2017
@@ -29,7 +29,7 @@ import org.apache.pig.impl.plan.VisitorE
 
 public class POPoissonSample extends PhysicalOperator {
 
-    private static final long serialVersionUID = 1L;
+    protected static final long serialVersionUID = 1L;
 
     // 17 is not a magic number. It can be obtained by using a poisson
     // cumulative distribution function with the mean set to 10 (empirically,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
 Wed Apr 12 02:20:20 2017
@@ -44,10 +44,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
@@ -64,7 +61,6 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -110,14 +106,8 @@ public class JobGraphBuilder extends Spa
             setReplicationForMergeJoin(sparkOp.physicalPlan);
             sparkOperToRDD(sparkOp);
             finishUDFs(sparkOp.physicalPlan);
-        } catch (InterruptedException e) {
-            throw new RuntimeException("fail to get the rdds of this spark 
operator: ", e);
-        } catch (JobCreationException e) {
-            throw new RuntimeException("fail to get the rdds of this spark 
operator: ", e);
-        } catch (ExecException e) {
-            throw new RuntimeException("fail to get the rdds of this spark 
operator: ", e);
-        } catch (IOException e) {
-            throw new RuntimeException("fail to get the rdds of this spark 
operator: ", e);
+        } catch (Exception e) {
+            throw new VisitorException("fail to get the rdds of this spark 
operator: ", e);
         }
     }
 
@@ -265,14 +255,14 @@ public class JobGraphBuilder extends Spa
 
         if (physicalOperator instanceof POSplit) {
             List<PhysicalPlan> successorPlans = ((POSplit) 
physicalOperator).getPlans();
-            for (PhysicalPlan successPlan : successorPlans) {
-                List<PhysicalOperator> leavesOfSuccessPlan = 
successPlan.getLeaves();
+            for (PhysicalPlan succcessorPlan : successorPlans) {
+                List<PhysicalOperator> leavesOfSuccessPlan = 
succcessorPlan.getLeaves();
                 if (leavesOfSuccessPlan.size() != 1) {
-                    LOG.error("the size of leaves of SuccessPlan should be 1");
-                    break;
+                    LOG.error("the size of leaves of successorPlan should be 
1");
+                    throw new RuntimeException("the size of the leaves of 
successorPlan should be 1");
                 }
                 PhysicalOperator leafOfSuccessPlan = 
leavesOfSuccessPlan.get(0);
-                physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan, 
operatorKeysOfAllPreds);
+                physicalToRDD(sparkOperator, succcessorPlan, 
leafOfSuccessPlan, operatorKeysOfAllPreds);
             }
         } else {
             RDDConverter converter = 
convertMap.get(physicalOperator.getClass());
@@ -403,7 +393,7 @@ public class JobGraphBuilder extends Spa
                 && physicalOperator instanceof POPoissonSampleSpark) {
             // set the runtime #reducer of the next job as the #partition
 
-            int defaultParallelism = SparkUtil.getParallelism(allPredRDDs, 
physicalOperator);
+            int defaultParallelism = 
SparkPigContext.get().getParallelism(allPredRDDs, physicalOperator);
 
             ParallelConstantVisitor visitor =
                     new ParallelConstantVisitor(sparkOperator.physicalPlan, 
defaultParallelism);
@@ -446,7 +436,7 @@ public class JobGraphBuilder extends Spa
         }
 
         @Override
-        public void visitPoissonSampleSpark(POPoissonSampleSpark po) {
+        public void visitPoissonSample(POPoissonSample po) {
             isAfterSampleOperator = true;
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
 Wed Apr 12 02:20:20 2017
@@ -60,7 +60,28 @@ public class JobMetricsListener implemen
 
     @Override
     public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
+//        uncomment and remove the code onTaskEnd until we fix PIG-5157. It is 
better to update taskMetrics of stage when stage completes
+//        if we update taskMetrics in onTaskEnd(), it consumes lot of memory.
+//        int stageId = stageCompleted.stageInfo().stageId();
+//        int stageAttemptId = stageCompleted.stageInfo().attemptId();
+//        String stageIdentifier = stageId + "_" + stageAttemptId;
+//        Integer jobId = stageIdToJobId.get(stageId);
+//        if (jobId == null) {
+//            LOG.warn("Cannot find job id for stage[" + stageId + "].");
+//        } else {
+//            Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
+//            if (jobMetrics == null) {
+//                jobMetrics = Maps.newHashMap();
+//                allJobMetrics.put(jobId, jobMetrics);
+//            }
+//            List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+//            if (stageMetrics == null) {
+//                stageMetrics = Lists.newLinkedList();
+//                jobMetrics.put(stageIdentifier, stageMetrics);
+//            }
+//            // uncomment until we fix PIG-5157. after we upgrade to spark2.0 
StageInfo().taskMetrics() api is available
+//            // stageMetrics.add(stageCompleted.stageInfo().taskMetrics());
+//        }
     }
 
     @Override

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java
 Wed Apr 12 02:20:20 2017
@@ -26,11 +26,8 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.mapred.JobConf;
 
-import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
@@ -40,21 +37,6 @@ import com.esotericsoftware.kryo.io.Outp
 public class KryoSerializer {
     private static final Log LOG = LogFactory.getLog(KryoSerializer.class);
 
-    public static byte[] serialize(Object object) {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        Output output = new Output(stream);
-
-        Utilities.runtimeSerializationKryo.get().writeObject(output, object);
-
-        output.close(); // close() also calls flush()
-        return stream.toByteArray();
-    }
-
-    public static <T> T deserialize(byte[] buffer, Class<T> clazz) {
-        return Utilities.runtimeSerializationKryo.get().readObject(
-                new Input(new ByteArrayInputStream(buffer)), clazz);
-    }
-
     public static byte[] serializeJobConf(JobConf jobConf) {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         try {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
 Wed Apr 12 02:20:20 2017
@@ -118,28 +118,10 @@ public class MapReducePartitionerWrapper
             int partition = (Integer) 
getPartitionMethod.invoke(mapredPartitioner,
                     writeableKey, null, numPartitions);
 
-            if (LOG.isDebugEnabled())
-                LOG.debug("MapReduce Partitioner partition number for key " + 
key +
-                        " is " + partition);
             return partition;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public boolean equals(Object other) {
-        boolean var4;
-        if(other instanceof MapReducePartitionerWrapper) {
-            MapReducePartitionerWrapper var3 = 
(MapReducePartitionerWrapper)other;
-            var4 = var3.numPartitions() == this.numPartitions();
-        } else {
-            var4 = false;
-        }
-
-        return var4;
-    }
-
-    public int hashCode() {
-        return this.numPartitions();
-    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 Wed Apr 12 02:20:20 2017
@@ -45,10 +45,10 @@ import com.google.common.collect.Lists;
  */
 public class SparkEngineConf implements Serializable {
 
-    private static final Log log = LogFactory.getLog(SparkEngineConf.class);
-    private static String SPARK_UDF_IMPORT_LIST= "spark.udf.import.list";
-    private static String 
SPARK_UDFCONTEXT_UDFCONFS="spark.udfcontext.udfConfs";
-    private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS= 
"spark.udfcontext.clientSysProps";
+    private static final Log LOG = LogFactory.getLog(SparkEngineConf.class);
+    private static String SPARK_UDF_IMPORT_LIST = "pig.spark.udf.import.list";
+    private static String SPARK_UDFCONTEXT_UDFCONFS = 
"pig.spark.udfcontext.udfConfs";
+    private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS = 
"pig.spark.udfcontext.clientSysProps";
 
     private Properties properties = new Properties();
 
@@ -82,8 +82,8 @@ public class SparkEngineConf implements
             //jobs(at that time, UDFContext#udfConfs and 
UDFContext#clientSysProps is null so we need to save their
             //value in SparkEngineConf#properties after these two variables 
are correctly initialized in
             //SparkUtil#newJobConf, More detailed see PIG-4920
-            String udfConfsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getUdfConfs());
-            String clientSysPropsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSysProps());
+            String udfConfsStr = UDFContext.getUDFContext().serialize();
+            String clientSysPropsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSystemProps());
             this.properties.setProperty(SPARK_UDFCONTEXT_UDFCONFS, 
udfConfsStr);
             this.properties.setProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS, 
clientSysPropsStr);
             out.writeObject(udfConfsStr);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Apr 12 02:20:20 2017
@@ -163,8 +163,7 @@ public class SparkLauncher extends Launc
         this.pigContext = pigContext;
         initialize(physicalPlan);
         SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-        if (LOG.isDebugEnabled())
-            explain(sparkplan, System.out, "text", true);
+        LOG.info(sparkplan);
         SparkPigStats sparkStats = (SparkPigStats) pigContext
                 .getExecutionEngine().instantiatePigStats();
         sparkStats.initialize(pigContext, sparkplan, jobConf);
@@ -199,7 +198,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POPackage.class, new PackageConverter());
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new 
GlobalRearrangeConverter());
-           convertMap.put(POJoinGroupSpark.class, new 
JoinGroupSparkConverter());
+        convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter());
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
         convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
@@ -246,8 +245,8 @@ public class SparkLauncher extends Launc
             CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan);
             combinerOptimizer.visit();
             if (LOG.isDebugEnabled()) {
-                System.out.println("after combiner optimization:");
-                explain(plan, System.out, "text", true);
+                LOG.debug("After combiner optimization:");
+                LOG.debug(plan);
             }
         }
 
@@ -271,8 +270,8 @@ public class SparkLauncher extends Launc
         boolean isMultiQuery = 
conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
 
         if (LOG.isDebugEnabled()) {
-            System.out.println("before multiquery optimization:");
-            explain(plan, System.out, "text", true);
+            LOG.debug("Before multiquery optimization:");
+            LOG.debug(plan);
         }
 
         if (isMultiQuery) {
@@ -289,13 +288,13 @@ public class SparkLauncher extends Launc
         joinOptimizer.visit();
 
         if (LOG.isDebugEnabled()) {
-            System.out.println("after multiquery optimization:");
-            explain(plan, System.out, "text", true);
+            LOG.debug("After multiquery optimization:");
+            LOG.debug(plan);
         }
     }
 
     private void cleanUpSparkJob(SparkPigStats sparkStats) throws 
ExecException {
-        LOG.info("clean up Spark Job");
+        LOG.info("Clean up Spark Job");
         boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
                 .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
         if (isLocal) {
@@ -307,7 +306,7 @@ public class SparkLauncher extends Launc
                     File deleteFile = new File(currentDirectoryPath + "/"
                             + shipFile.getName());
                     if (deleteFile.exists()) {
-                        LOG.info(String.format("delete ship file result: %b",
+                        LOG.info(String.format("Delete ship file result: %b",
                                 deleteFile.delete()));
                     }
                 }
@@ -320,7 +319,7 @@ public class SparkLauncher extends Launc
                     File deleteFile = new File(currentDirectoryPath + "/"
                             + fileName);
                     if (deleteFile.exists()) {
-                        LOG.info(String.format("delete cache file result: %b",
+                        LOG.info(String.format("Delete cache file result: %b",
                                 deleteFile.delete()));
                     }
                 }
@@ -351,7 +350,7 @@ public class SparkLauncher extends Launc
     }
 
     private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws 
IOException {
-        LOG.info("add files Spark Job");
+        LOG.info("Add files Spark Job");
         String shipFiles = pigContext.getProperties().getProperty(
                 "pig.streaming.ship.files");
         shipFiles(shipFiles);
@@ -377,7 +376,6 @@ public class SparkLauncher extends Launc
             for (String file : shipFiles.split(",")) {
                 File shipFile = new File(file.trim());
                 if (shipFile.exists()) {
-                    LOG.info(String.format("shipFile:%s", shipFile));
                     addResourceToSparkJobWorkingDirectory(shipFile,
                             shipFile.getName(), ResourceType.FILE);
                 }
@@ -397,7 +395,7 @@ public class SparkLauncher extends Launc
                 FileSystem fs = tmpFilePath.getFileSystem(jobConf);
                 fs.copyToLocalFile(src, tmpFilePath);
                 tmpFile.deleteOnExit();
-                LOG.info(String.format("cacheFile:%s", fileName));
+                LOG.info(String.format("CacheFile:%s", fileName));
                 addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
                         ResourceType.FILE);
             }
@@ -412,14 +410,14 @@ public class SparkLauncher extends Launc
 
     private void addJarsToSparkJob(SparkOperPlan sparkPlan) throws IOException 
{
         Set<String> allJars = new HashSet<String>();
-        LOG.info("add default jars to Spark Job");
+        LOG.info("Add default jars to Spark Job");
         allJars.addAll(JarManager.getDefaultJars());
-        LOG.info("add extra jars to Spark Job");
+        LOG.info("Add extra jars to Spark Job");
         for (String scriptJar : pigContext.scriptJars) {
             allJars.add(scriptJar);
         }
 
-        LOG.info("add udf jars to Spark Job");
+        LOG.info("Add udf jars to Spark Job");
         UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkPlan, pigContext);
         udfJarsFinder.visit();
         Set<String> udfJars = udfJarsFinder.getUdfJars();
@@ -429,7 +427,7 @@ public class SparkLauncher extends Launc
 
         File scriptUDFJarFile = JarManager.createPigScriptUDFJar(pigContext);
         if (scriptUDFJarFile != null) {
-            LOG.info("add script udf jar to Spark job");
+            LOG.info("Add script udf jar to Spark job");
             allJars.add(scriptUDFJarFile.getAbsolutePath().toString());
         }
 
@@ -461,11 +459,11 @@ public class SparkLauncher extends Launc
             synchronized(SparkLauncher.class) {
                 if (localFile.exists()) {
                     LOG.info(String.format(
-                            "jar file %s exists, ready to delete",
+                            "Jar file %s exists, ready to delete",
                             localFile.getAbsolutePath()));
                     localFile.delete();
                 } else {
-                    LOG.info(String.format("jar file %s not exists,",
+                    LOG.info(String.format("Jar file %s not exists,",
                             localFile.getAbsolutePath()));
                 }
                 Files.copy(Paths.get(new 
Path(resourcePath.getAbsolutePath()).toString()),
@@ -490,7 +488,7 @@ public class SparkLauncher extends Launc
             throw new RuntimeException("cache file is invalid format, file:"
                     + cacheFileUrl);
         } else {
-            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            LOG.debug("Cache file name is valid:" + cacheFileUrl);
             return fileName;
         }
     }
@@ -503,12 +501,12 @@ public class SparkLauncher extends Launc
             throw new RuntimeException("cache file is invalid format, file:"
                     + cacheFileUrl);
         } else {
-            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            LOG.debug("Cache file name is valid:" + cacheFileUrl);
             return fileName;
         }
     }
 
-    public SparkOperPlan compile(PhysicalPlan physicalPlan,
+    private SparkOperPlan compile(PhysicalPlan physicalPlan,
                                   PigContext pigContext) throws PlanException, 
IOException,
             VisitorException {
         SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
@@ -558,13 +556,7 @@ public class SparkLauncher extends Launc
             Properties pigCtxtProperties = pc.getProperties();
 
             sparkConf.setMaster(master);
-            sparkConf.setAppName("PigOnSpark:" + 
pigCtxtProperties.getProperty(PigContext.JOB_NAME));
-            // On Spark 1.6, Netty file server doesn't allow adding the same 
file with the same name twice
-            // This is a problem for streaming using a script + explicit ship 
the same script combination (PIG-5134)
-            // HTTP file server doesn't have this restriction, it overwrites 
the file if added twice
-            String useNettyFileServer = 
pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, 
"false");
-            sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer);
-
+            
sparkConf.setAppName(pigCtxtProperties.getProperty(PigContext.JOB_NAME,"pig"));
             if (sparkHome != null && !sparkHome.isEmpty()) {
                 sparkConf.setSparkHome(sparkHome);
             } else {
@@ -575,11 +567,12 @@ public class SparkLauncher extends Launc
             for (String key : pigCtxtProperties.stringPropertyNames()) {
                 if (key.startsWith("spark.")) {
                     LOG.debug("Copying key " + key + " with value " +
-                        pigCtxtProperties.getProperty(key) + " to SparkConf");
+                            pigCtxtProperties.getProperty(key) + " to 
SparkConf");
                     sparkConf.set(key, pigCtxtProperties.getProperty(key));
                 }
             }
 
+            //see PIG-5200 why need to set spark.executor.userClassPathFirst 
as true
             sparkConf.set("spark.executor.userClassPathFirst", "true");
             checkAndConfigureDynamicAllocation(master, sparkConf);
 
@@ -594,11 +587,11 @@ public class SparkLauncher extends Launc
         if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
             if (!master.startsWith("yarn")) {
                 LOG.warn("Dynamic allocation is enabled, but " +
-                    "script isn't running on yarn. Ignoring ...");
+                        "script isn't running on yarn. Ignoring ...");
             }
             if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false)) 
{
                 LOG.info("Spark shuffle service is being enabled as dynamic " +
-                    "allocation is enabled");
+                        "allocation is enabled");
                 sparkConf.set("spark.shuffle.service.enabled", "true");
             }
         }
@@ -624,7 +617,7 @@ public class SparkLauncher extends Launc
                          String format, boolean verbose)
             throws IOException {
         Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys();
-        List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
+        List<OperatorKey> operKeyList = new ArrayList<>(allOperKeys.keySet());
         Collections.sort(operKeyList);
 
         if (format.equals("text")) {
@@ -671,15 +664,19 @@ public class SparkLauncher extends Launc
 
     @Override
     public void kill() throws BackendException {
-        // TODO Auto-generated method stub
-
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
     }
 
     @Override
     public void killJob(String jobID, Configuration conf)
             throws BackendException {
-        // TODO Auto-generated method stub
-
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
     }
 
     /**
@@ -700,7 +697,7 @@ public class SparkLauncher extends Launc
         PigMapReduce.sJobConfInternal.set(jobConf);
         String parallelism = 
pigContext.getProperties().getProperty("spark.default.parallelism");
         if (parallelism != null) {
-            
SparkUtil.setSparkDefaultParallelism(Integer.parseInt(parallelism));
+            
SparkPigContext.get().setPigDefaultParallelism(Integer.parseInt(parallelism));
         }
     }
 }

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java?rev=1791060&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
 Wed Apr 12 02:20:20 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.rdd.RDD;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * singleton class like PigContext
+ */
+public class SparkPigContext {
+
+    private static SparkPigContext context =  null;
+    private static ThreadLocal<Integer> PIG_DEFAULT_PARALLELISM = null;
+    private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
broadcastedVars = new ConcurrentHashMap() ;
+
+    public static SparkPigContext get(){
+        if( context == null){
+            context = new SparkPigContext();
+        }
+        return context;
+    }
+    public static int getPigDefaultParallelism() {
+        return PIG_DEFAULT_PARALLELISM.get();
+    }
+
+
+    public static int getParallelism(List<RDD<Tuple>> predecessors,
+                                     PhysicalOperator physicalOperator) {
+        if (PIG_DEFAULT_PARALLELISM != null) {
+           return getPigDefaultParallelism();
+        }
+
+        int parallelism = physicalOperator.getRequestedParallelism();
+        if (parallelism <= 0) {
+            //Spark automatically sets the number of "map" tasks to run on 
each file according to its size (though
+            // you can control it through optional parameters to 
SparkContext.textFile, etc), and for distributed
+            //"reduce" operations, such as groupByKey and reduceByKey, it uses 
the largest parent RDD's number of
+            // partitions.
+            int maxParallism = 0;
+            for (int i = 0; i < predecessors.size(); i++) {
+                int tmpParallelism = predecessors.get(i).getNumPartitions();
+                if (tmpParallelism > maxParallism) {
+                    maxParallism = tmpParallelism;
+                }
+            }
+            parallelism = maxParallism;
+        }
+        return parallelism;
+    }
+
+    public static void setPigDefaultParallelism(int pigDefaultParallelism) {
+        PIG_DEFAULT_PARALLELISM.set(pigDefaultParallelism);
+    }
+
+     public static ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
getBroadcastedVars() {
+        return broadcastedVars;
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Wed Apr 12 02:20:20 2017
@@ -58,10 +58,6 @@ import org.apache.spark.broadcast.Broadc
 import org.apache.spark.rdd.RDD;
 
 public class SparkUtil {
-
-    private static ThreadLocal<Integer> SPARK_DEFAULT_PARALLELISM = null;
-    private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
broadcastedVars = new ConcurrentHashMap() ;
-
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {
         return ClassTag$.MODULE$.apply(clazz);
     }
@@ -90,6 +86,7 @@ public class SparkUtil {
         // but we still need store it in jobConf because it will be used in 
PigOutputFormat#setupUdfEnvAndStores
         jobConf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
+
         Random rand = new Random();
         jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
Integer.toString(rand.nextInt()));
         jobConf.set(PigConstants.LOCAL_CODE_DIR,
@@ -129,30 +126,6 @@ public class SparkUtil {
         }
     }
 
-    public static int getParallelism(List<RDD<Tuple>> predecessors,
-            PhysicalOperator physicalOperator) {
-        if (SPARK_DEFAULT_PARALLELISM != null) {
-            return getSparkDefaultParallelism();
-        }
-
-        int parallelism = physicalOperator.getRequestedParallelism();
-        if (parallelism <= 0) {
-            //Spark automatically sets the number of "map" tasks to run on 
each file according to its size (though
-            // you can control it through optional parameters to 
SparkContext.textFile, etc), and for distributed
-            //"reduce" operations, such as groupByKey and reduceByKey, it uses 
the largest parent RDD's number of
-            // partitions.
-            int maxParallism = 0;
-            for (int i = 0; i < predecessors.size(); i++) {
-                int tmpParallelism = predecessors.get(i).getNumPartitions();
-                if (tmpParallelism > maxParallism) {
-                    maxParallism = tmpParallelism;
-                }
-            }
-            parallelism = maxParallism;
-        }
-        return parallelism;
-    }
-
     public static Partitioner getPartitioner(String customPartitioner, int 
parallelism) {
         if (customPartitioner == null) {
             return new HashPartitioner(parallelism);
@@ -182,16 +155,6 @@ public class SparkUtil {
         baseSparkOp.physicalPlan.addAsLeaf(sort);
     }
 
-    static public ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
getBroadcastedVars() {
-        return broadcastedVars;
-    }
-
 
-    public static int getSparkDefaultParallelism() {
-        return SPARK_DEFAULT_PARALLELISM.get();
-    }
 
-    public static void setSparkDefaultParallelism(int sparkDefaultParallelism) 
{
-        SPARK_DEFAULT_PARALLELISM.set(sparkDefaultParallelism);
-    }
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
 Wed Apr 12 02:20:20 2017
@@ -51,7 +51,7 @@ public class UDFJarsFinder extends Spark
                     }
                 }
             } catch (IOException e) {
-                throw new RuntimeException("pigContext.getClassForAlias(udf) 
fail, ", e);
+                throw new VisitorException(e);
             }
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
 Wed Apr 12 02:20:20 2017
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.JavaRDD;
@@ -46,7 +47,7 @@ public class BroadcastConverter implemen
 
         // Save the broadcast variable to broadcastedVars map, so that this
         // broadcasted variable can be referenced by the driver client.
-        SparkUtil.getBroadcastedVars().put(po.getBroadcastedVariableName(), 
broadcastedRDD);
+        
SparkPigContext.get().getBroadcastedVars().put(po.getBroadcastedVariableName(), 
broadcastedRDD);
 
         return rdd;
     }


Reply via email to