Author: knoguchi
Date: Tue May 14 20:37:37 2024
New Revision: 1917723

URL: http://svn.apache.org/viewvc?rev=1917723&view=rev
Log:
PIG-5439: Support Spark 3 and drop SparkShim (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries-h2.properties
    pig/trunk/ivy/libraries-h3.properties
    pig/trunk/ivy/libraries.properties
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/trunk/test/org/apache/pig/test/TestStoreBase.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 14 20:37:37 2024
@@ -24,6 +24,7 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi)
+PIG-5439: Support Spark 3 and drop SparkShim (knoguchi)
  
 OPTIMIZATIONS
  

Modified: pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Tue May 14 20:37:37 2024
@@ -245,13 +245,9 @@
         </then>
     </if>
     <property name="hbaseversion" value="1" />
-    <property name="sparkversion" value="1" />
+    <property name="sparkversion" value="2" />
     <property name="hiveversion" value="1" />
 
-    <condition property="src.exclude.dir" value="**/Spark2*.java" 
else="**/Spark1*.java">
-        <equals arg1="${sparkversion}" arg2="1"/>
-    </condition>
-
     <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
 
     <property name="src.shims.dir" value="${basedir}/shims/src/hadoop2" />
@@ -586,7 +582,7 @@
         </copy>
     </target>
 
-    <target name="compile-test" depends="jar-simple, ivy-test">
+    <target name="compile-test" depends="jar, ivy-test">
         <echo>*** Building Test Sources ***</echo>
         <echo>*** To compile with all warnings enabled, supply 
-Dall.warnings=1 on command line ***</echo>
         <echo>*** Else, you will only be warned about deprecations ***</echo>
@@ -710,7 +706,7 @@
     <!-- ================================================================== -->
     <!-- Make pig.jar                                                       -->
     <!-- ================================================================== -->
-    <target name="jar-simple" depends="compile,ivy-buildJar" 
description="Create pig core jar">
+    <target name="jar" depends="compile,ivy-buildJar" description="Create pig 
core jar">
         <buildJar svnString="${svn.revision}" 
outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
         <buildJar svnString="${svn.revision}" 
outputFile="${output.jarfile.withouthadoop}" 
includedJars="runtime.dependencies-withouthadoop.jar"/>
         <antcall target="copyCommonDependencies"/>
@@ -764,6 +760,7 @@
         <mkdir dir="${spark.lib.dir}" />
         <copy todir="${spark.lib.dir}">
             <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="jackson-*.jar"/>
         </copy>
     </target>
 
@@ -858,44 +855,6 @@
         <buildJar svnString="${svn.revision}" 
outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
     </target>
 
-    <target name="jar" description="Create pig jar with Spark 1 and 2">
-        <echo>Compiling against Spark 2</echo>
-        <antcall target="clean-deps" inheritRefs="true" inheritall="true"/>
-        <propertyreset name="sparkversion" value="2"/>
-        <propertyreset name="src.exclude.dir" value="**/Spark1*.java" />
-        <antcall target="jar-core" inheritRefs="true" inheritall="true"/>
-        <move file="${output.jarfile.core}" 
tofile="${basedir}/_pig-shims.jar"/>
-
-        <echo>Compiling against Spark 1</echo>
-        <antcall target="clean-deps" inheritRefs="true" inheritall="true"/>
-        <propertyreset name="sparkversion" value="1"/>
-        <propertyreset name="src.exclude.dir" value="**/Spark2*.java" />
-        <antcall target="jar-simple" inheritRefs="true" inheritall="true"/>
-        <jar update="yes" jarfile="${output.jarfile.core}">
-            <zipfileset src="${basedir}/_pig-shims.jar" 
includes="**/Spark2*.class"/>
-        </jar>
-        <if>
-            <equals arg1="${isHadoop2}" arg2="true" />
-            <then>
-                <jar update="yes" 
jarfile="${output.jarfile.backcompat-core-h2}">
-                    <zipfileset src="${basedir}/_pig-shims.jar" 
includes="**/Spark2*.class"/>
-                </jar>
-                <jar update="yes" jarfile="${output.jarfile.withouthadoop-h2}">
-                    <zipfileset src="${basedir}/_pig-shims.jar" 
includes="**/Spark2*.class"/>
-                </jar>
-            </then>
-            <else>
-                <jar update="yes" 
jarfile="${output.jarfile.backcompat-core-h3}">
-                    <zipfileset src="${basedir}/_pig-shims.jar" 
includes="**/Spark2*.class"/>
-                </jar>
-                <jar update="yes" jarfile="${output.jarfile.withouthadoop-h3}">
-                    <zipfileset src="${basedir}/_pig-shims.jar" 
includes="**/Spark2*.class"/>
-                </jar>
-            </else>
-        </if>
-        <delete file="${basedir}/_pig-shims.jar"/>
-    </target>
-
     <!-- ================================================================== -->
     <!-- macrodef: buildJar                                                 -->
     <!-- ================================================================== -->

Modified: pig/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Tue May 14 20:37:37 2024
@@ -42,8 +42,8 @@
     <conf name="hadoop3" visibility="private"/>
     <conf name="hbase1" visibility="private"/>
     <conf name="hbase2" visibility="private"/>
-    <conf name="spark1" visibility="private" />
     <conf name="spark2" visibility="private" />
+    <conf name="spark3" visibility="private" />
     <conf name="hive1" visibility="private"/>
     <conf name="hive3" visibility="private"/>
     <conf name="owasp" visibility="private" description="Artifacts required 
for owasp target"/>
@@ -239,9 +239,14 @@
     <dependency org="org.apache.ivy" name="ivy" rev="${ivy.version}"
       conf="compile->master"/>
     <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" 
rev="${jackson.version}"
-      conf="compile->master"/>
+      conf="compile->master">
+      <exclude org="com.thoughtworks.paranamer" module="paranamer"/>
+    </dependency>
     <dependency org="org.codehaus.jackson" name="jackson-core-asl" 
rev="${jackson.version}"
-      conf="compile->master"/>
+      conf="compile->master">
+      <exclude org="com.thoughtworks.paranamer" module="paranamer"/>
+    </dependency>
+    <dependency org="com.thoughtworks.paranamer" name="paranamer" 
rev="${paranamer.version}" conf="compile->master"/>
     <dependency org="org.fusesource.jansi" name="jansi" rev="${jansi.version}"
       conf="compile->master"/>
     <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" 
conf="compile->master"/>
@@ -615,26 +620,43 @@
 
     <dependency org="org.apache.parquet" name="parquet-pig-bundle" 
rev="${parquet-pig-bundle.version}" conf="compile->master"/>
 
-    <!-- for Spark 1.x integration -->
-    <dependency org="org.apache.spark" name="spark-core_2.11" 
rev="${spark1.version}" conf="spark1->default">
-        <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
-        <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
-        <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
-        <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
-        <exclude org="org.apache.hadoop" />
-        <exclude org="com.esotericsoftware.kryo" />
-        <exclude org="jline" module="jline"/>
-        <exclude org="com.google.guava" />
+    <!-- for Spark 2.x integration -->
+    <dependency org="org.apache.spark" 
name="spark-core_${spark2-scala.version}" rev="${spark2.version}" 
conf="spark2->default">
+      <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
+      <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
+      <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-annotations"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-core"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-databind"/>
+      <exclude org="com.fasterxml.jackson.module" 
module="jackson-module-scala_${spark-scala.version}"/>
+      <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
+      <exclude org="org.apache.hadoop" />
+      <exclude org="com.esotericsoftware.kryo" />
+      <exclude org="jline" module="jline"/>
+      <exclude org="com.google.guava" />
+      <exclude org="io.netty" module="netty"/>
+      <exclude org="io.netty" name="netty-all"/>
     </dependency>
-    <dependency org="org.apache.spark" name="spark-yarn_2.11" 
rev="${spark1.version}" conf="spark1->default">
-        <exclude org="org.apache.hadoop" />
+    <dependency org="org.apache.spark" 
name="spark-yarn_${spark2-scala.version}" rev="${spark2.version}" 
conf="spark2->default">
+      <exclude org="org.apache.hadoop" />
     </dependency>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" 
conf="spark2->default;spark3->default"/>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" 
conf="test->master"/>
+    <dependency org="org.scala-lang.modules" 
name="scala-xml_${spark2-scala.version}" rev="${scala-xml.version}" 
conf="spark2->default"/>
+    <dependency org="com.fasterxml.jackson.module" 
name="jackson-module-scala_${spark2-scala.version}" 
+      rev="${jackson-module-scala_spark2.version}" conf="spark2->default"/>
+    <dependency org="com.fasterxml.jackson.module" 
name="jackson-module-paranamer" 
+      rev="${jackson-module-scala_spark2.version}" conf="spark2->default"/>
 
     <!-- for Spark 2.x integration -->
-    <dependency org="org.apache.spark" name="spark-core_2.11" 
rev="${spark2.version}" conf="spark2->default">
+    <dependency org="org.apache.spark" 
name="spark-core_${spark3-scala.version}" rev="${spark3.version}" 
conf="spark3->default">
       <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
       <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
       <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-annotations"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-core"/>
+      <exclude org="com.fasterxml.jackson.core" module="jackson-databind"/>
+      <exclude org="com.fasterxml.jackson.module" 
module="jackson-module-scala_${spark-scala.version}"/>
       <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
       <exclude org="org.apache.hadoop" />
       <exclude org="com.esotericsoftware.kryo" />
@@ -643,14 +665,18 @@
       <exclude org="io.netty" module="netty"/>
       <exclude org="io.netty" name="netty-all"/>
     </dependency>
-    <dependency org="org.apache.spark" name="spark-yarn_2.11" 
rev="${spark2.version}" conf="spark2->default">
+    <dependency org="org.apache.spark" 
name="spark-yarn_${spark3-scala.version}" rev="${spark3.version}" 
conf="spark3->default">
       <exclude org="org.apache.hadoop" />
     </dependency>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" 
conf="spark3->default;spark3->default"/>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" 
conf="test->master"/>
+    <dependency org="org.scala-lang.modules" 
name="scala-xml_${spark3-scala.version}" rev="${scala-xml.version}" 
conf="spark3->default"/>
+    <dependency org="com.fasterxml.jackson.module" 
name="jackson-module-scala_${spark3-scala.version}" 
+      rev="${jackson-module-scala_spark3.version}" conf="spark3->default"/>
+    <dependency org="com.fasterxml.jackson.module" 
name="jackson-module-paranamer" 
+      rev="${jackson-module-scala_spark3.version}" conf="spark3->default"/>
 
     <dependency org="asm" name="asm" rev="${asm.version}" 
conf="compile->master"/>
-    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" 
conf="spark1->default;spark2->default"/>
-    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" 
conf="test->master"/>
-    <dependency org="org.scala-lang.modules" name="scala-xml_2.11" 
rev="${scala-xml.version}" conf="spark1->default;spark2->default"/>
 
     <!-- for Tez integration -->
     <dependency org="org.apache.tez" name="tez" rev="${tez.version}"

Modified: pig/trunk/ivy/libraries-h2.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries-h2.properties?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/ivy/libraries-h2.properties (original)
+++ pig/trunk/ivy/libraries-h2.properties Tue May 14 20:37:37 2024
@@ -23,3 +23,5 @@ netty.version=3.10.6.Final
 netty-all.version=4.1.50.Final
 tez.version=0.9.2
 servlet-api.version=2.5
+spark-scala.version=2.11
+jackson-module-scala.version=2.10.2

Modified: pig/trunk/ivy/libraries-h3.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries-h3.properties?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/ivy/libraries-h3.properties (original)
+++ pig/trunk/ivy/libraries-h3.properties Tue May 14 20:37:37 2024
@@ -25,3 +25,5 @@ netty-all.version=4.1.77.Final
 re2j.version=1.0
 tez.version=0.10.2
 servlet-api.version=3.1.0
+spark-scala.version=2.12
+jackson-module-scala.version=2.10.2

Modified: pig/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Tue May 14 20:37:37 2024
@@ -66,8 +66,12 @@ rats-lib.version=0.5.1
 slf4j-api.version=1.7.36
 slf4j-reload4j.version=1.7.36
 reload4j.version=1.2.24
-spark1.version=1.6.1
 spark2.version=2.4.8
+spark2-scala.version=2.11
+jackson-module-scala_spark2.version=2.9.10
+spark3.version=3.2.4
+spark3-scala.version=2.12
+jackson-module-scala_spark3.version=2.10.2
 xerces.version=2.10.0
 xalan.version=2.7.1
 wagon-http.version=1.0-beta-2
@@ -96,3 +100,4 @@ roaring-bitmap-shaded.version=0.7.14
 dependency-check-ant.version=7.4.4
 woodstox.version=5.3.0
 stax2-api.version=4.2.1
+paranamer.version=2.8

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
 Tue May 14 20:37:37 2024
@@ -21,6 +21,8 @@ package org.apache.pig.backend.hadoop.ex
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+
 import org.apache.spark.executor.TaskMetrics;
 import org.apache.spark.scheduler.SparkListener;
 
@@ -40,8 +42,8 @@ public class JobStatisticCollector {
 
     public SparkListener getSparkListener() {
         if (sparkListener == null) {
-            sparkListener = SparkShims.getInstance()
-                    .getJobMetricsListener(jobIdToStageId, stageIdToJobId, 
allJobStatistics, finishedJobIds);
+            sparkListener = new JobMetricsListener(jobIdToStageId,
+                                      stageIdToJobId, allJobStatistics, 
finishedJobIds);
         }
         return sparkListener;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
 Tue May 14 20:37:37 2024
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.spark.SparkJobStats;
-import org.apache.pig.tools.pigstats.spark.Spark1JobStats;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerBlockUpdated;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
-import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import scala.Tuple2;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class Spark1Shims extends SparkShims {
-    @Override
-    public <T, R> FlatMapFunction<T, R> flatMapFunction(final 
FlatMapFunctionAdapter<T, R> function) {
-        return new FlatMapFunction<T, R>() {
-            @Override
-            public Iterable<R> call(final T t) throws Exception {
-                return new Iterable<R>() {
-                    @Override
-                    public Iterator<R> iterator() {
-                        try {
-                            return function.call(t);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                };
-
-            }
-        };
-    }
-
-    @Override
-    public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final 
PairFlatMapFunctionAdapter<T, K, V> function) {
-        return new PairFlatMapFunction<T, K, V>() {
-            @Override
-            public Iterable<Tuple2<K, V>> call(final T t) throws Exception {
-                return new Iterable<Tuple2<K, V>>() {
-                    @Override
-                    public Iterator<Tuple2<K, V>> iterator() {
-                        try {
-                            return function.call(t);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                };
-
-            }
-        };
-    }
-
-    @Override
-    public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean 
shuffle) {
-        return rdd.coalesce(numPartitions, shuffle, null);
-    }
-
-    @Override
-    public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, 
Configuration conf) {
-        return new Spark1JobStats(jobId, plan, conf);
-    }
-
-    @Override
-    public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, 
Configuration conf) {
-        return new Spark1JobStats(jobId, plan, conf);
-    }
-
-    @Override
-    public <T> OptionalWrapper<T> wrapOptional(T tuple) {
-        final Optional<T> t = (Optional<T>) tuple;
-
-        return new OptionalWrapper<T>() {
-            @Override
-            public boolean isPresent() {
-                return t.isPresent();
-            }
-
-            @Override
-            public T get() {
-                return t.get();
-            }
-        };
-    }
-
-    private static class JobMetricsListener implements SparkListener {
-        private final Log LOG = LogFactory.getLog(JobMetricsListener.class);
-
-        private Map<Integer, int[]> jobIdToStageId;
-        private Map<Integer, Integer> stageIdToJobId;
-        private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics;
-        private Set<Integer> finishedJobIds;
-
-        JobMetricsListener(final Map<Integer, int[]> jobIdToStageId,
-                           final Map<Integer, Integer> stageIdToJobId,
-                           final Map<Integer, Map<String, List<TaskMetrics>>> 
allJobMetrics,
-                           final Set<Integer> finishedJobIds) {
-            this.jobIdToStageId = jobIdToStageId;
-            this.stageIdToJobId = stageIdToJobId;
-            this.allJobMetrics = allJobMetrics;
-            this.finishedJobIds = finishedJobIds;
-        }
-
-        @Override
-        public void onStageCompleted(SparkListenerStageCompleted 
stageCompleted) {
-        }
-
-        @Override
-        public void onStageSubmitted(SparkListenerStageSubmitted 
stageSubmitted) {
-        }
-
-        @Override
-        public void onTaskStart(SparkListenerTaskStart taskStart) {
-        }
-
-        @Override
-        public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
-        }
-
-        @Override
-        public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemoved) {
-        }
-
-        @Override
-        public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
-        }
-
-        @Override
-        public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
-        }
-
-        @Override
-        public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-            int stageId = taskEnd.stageId();
-            int stageAttemptId = taskEnd.stageAttemptId();
-            String stageIdentifier = stageId + "_" + stageAttemptId;
-            Integer jobId = stageIdToJobId.get(stageId);
-            if (jobId == null) {
-                LOG.warn("Cannot find job id for stage[" + stageId + "].");
-            } else {
-                Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
-                if (jobMetrics == null) {
-                    jobMetrics = Maps.newHashMap();
-                    allJobMetrics.put(jobId, jobMetrics);
-                }
-                List<TaskMetrics> stageMetrics = 
jobMetrics.get(stageIdentifier);
-                if (stageMetrics == null) {
-                    stageMetrics = Lists.newLinkedList();
-                    jobMetrics.put(stageIdentifier, stageMetrics);
-                }
-                stageMetrics.add(taskEnd.taskMetrics());
-            }
-        }
-
-        @Override
-        public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-            int jobId = jobStart.jobId();
-            int size = jobStart.stageIds().size();
-            int[] intStageIds = new int[size];
-            for (int i = 0; i < size; i++) {
-                Integer stageId = (Integer) jobStart.stageIds().apply(i);
-                intStageIds[i] = stageId;
-                stageIdToJobId.put(stageId, jobId);
-            }
-            jobIdToStageId.put(jobId, intStageIds);
-        }
-
-        @Override
-        public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-            finishedJobIds.add(jobEnd.jobId());
-            notify();
-        }
-
-        @Override
-        public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
-        }
-
-        @Override
-        public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
-        }
-
-        @Override
-        public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
-        }
-
-        @Override
-        public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-        }
-
-        @Override
-        public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
-        }
-
-        @Override
-        public void onApplicationEnd(SparkListenerApplicationEnd 
applicationEnd) {
-        }
-
-        @Override
-        public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
-        }
-    }
-
-    @Override
-    public SparkListener getJobMetricsListener(Map<Integer, int[]> 
jobIdToStageId,
-                                                          Map<Integer, 
Integer> stageIdToJobId,
-                                                          Map<Integer, 
Map<String, List<TaskMetrics>>> allJobMetrics,
-                                                          Set<Integer> 
finishedJobIds) {
-        return new JobMetricsListener(jobIdToStageId, stageIdToJobId, 
allJobMetrics, finishedJobIds);
-    }
-
-    @Override
-    public void addSparkListener(SparkContext sc, SparkListener sparkListener) 
{
-        sc.addSparkListener(sparkListener);
-    }
-}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
 Tue May 14 20:37:37 2024
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.spark.Spark2JobStats;
-import org.apache.pig.tools.pigstats.spark.SparkJobStats;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.rdd.PartitionCoalescer;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import scala.Option;
-import scala.Tuple2;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class Spark2Shims extends SparkShims {
-    @Override
-    public <T, R> FlatMapFunction flatMapFunction(final 
FlatMapFunctionAdapter<T, R> function) {
-        return new FlatMapFunction<T, R>() {
-            @Override
-            public Iterator<R> call(T t) throws Exception {
-                return function.call(t);
-            }
-        };
-    }
-
-    @Override
-    public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final 
PairFlatMapFunctionAdapter<T, K, V> function) {
-        return new PairFlatMapFunction<T, K, V>() {
-            @Override
-            public Iterator<Tuple2<K, V>> call(T t) throws Exception {
-                return function.call(t);
-            }
-        };
-    }
-
-    @Override
-    public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean 
shuffle) {
-        return rdd.coalesce(numPartitions, shuffle, 
Option.<PartitionCoalescer>empty(), null);
-    }
-
-    @Override
-    public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, 
Configuration conf) {
-        return new Spark2JobStats(jobId, plan, conf);
-    }
-
-    @Override
-    public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, 
Configuration conf) {
-        return new Spark2JobStats(jobId, plan, conf);
-    }
-
-    @Override
-    public <T> OptionalWrapper<T> wrapOptional(T tuple) {
-        final Optional<T> t = (Optional<T>) tuple;
-
-        return new OptionalWrapper<T>() {
-            @Override
-            public boolean isPresent() {
-                return t.isPresent();
-            }
-
-            @Override
-            public T get() {
-                return t.get();
-            }
-        };
-    }
-
-    private static class JobMetricsListener extends SparkListener {
-        private final Log LOG = LogFactory.getLog(JobMetricsListener.class);
-
-        private Map<Integer, int[]> jobIdToStageId;
-        private Map<Integer, Integer> stageIdToJobId;
-        private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics;
-        private Set<Integer> finishedJobIds;
-
-        JobMetricsListener(final Map<Integer, int[]> jobIdToStageId,
-                           final Map<Integer, Integer> stageIdToJobId,
-                           final Map<Integer, Map<String, List<TaskMetrics>>> 
allJobMetrics,
-                           final Set<Integer> finishedJobIds) {
-            this.jobIdToStageId = jobIdToStageId;
-            this.stageIdToJobId = stageIdToJobId;
-            this.allJobMetrics = allJobMetrics;
-            this.finishedJobIds = finishedJobIds;
-        }
-
-        @Override
-        public synchronized void onStageCompleted(SparkListenerStageCompleted 
stageCompleted) {
-            int stageId = stageCompleted.stageInfo().stageId();
-            int stageAttemptId = stageCompleted.stageInfo().attemptId();
-            String stageIdentifier = stageId + "_" + stageAttemptId;
-            Integer jobId = stageIdToJobId.get(stageId);
-            if (jobId == null) {
-                LOG.warn("Cannot find job id for stage[" + stageId + "].");
-            } else {
-                Map<String, List<TaskMetrics>> jobMetrics = 
allJobMetrics.get(jobId);
-                if (jobMetrics == null) {
-                    jobMetrics = Maps.newHashMap();
-                    allJobMetrics.put(jobId, jobMetrics);
-                }
-                List<TaskMetrics> stageMetrics = 
jobMetrics.get(stageIdentifier);
-                if (stageMetrics == null) {
-                    stageMetrics = Lists.newLinkedList();
-                    jobMetrics.put(stageIdentifier, stageMetrics);
-                }
-                stageMetrics.add(stageCompleted.stageInfo().taskMetrics());
-            }
-        }
-
-        @Override
-        public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-            int jobId = jobStart.jobId();
-            int size = jobStart.stageIds().size();
-            int[] intStageIds = new int[size];
-            for (int i = 0; i < size; i++) {
-                Integer stageId = (Integer) jobStart.stageIds().apply(i);
-                intStageIds[i] = stageId;
-                stageIdToJobId.put(stageId, jobId);
-            }
-            jobIdToStageId.put(jobId, intStageIds);
-        }
-
-        @Override
-        public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-            finishedJobIds.add(jobEnd.jobId());
-            notify();
-        }
-    }
-
-    @Override
-    public SparkListener getJobMetricsListener(Map<Integer, int[]> 
jobIdToStageId,
-                                               Map<Integer, Integer> 
stageIdToJobId,
-                                               Map<Integer, Map<String, 
List<TaskMetrics>>> allJobMetrics,
-                                               Set<Integer> finishedJobIds) {
-        return new JobMetricsListener(jobIdToStageId, stageIdToJobId, 
allJobMetrics, finishedJobIds);
-    }
-
-    @Override
-    public void addSparkListener(SparkContext sc, SparkListener sparkListener) 
{
-        sc.addSparkListener(sparkListener);
-    }
-
-}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Tue May 14 20:37:37 2024
@@ -144,8 +144,6 @@ import org.apache.spark.scheduler.StatsR
 
 import com.google.common.base.Joiner;
 
-import static 
org.apache.pig.backend.hadoop.executionengine.spark.SparkShims.SPARK_VERSION;
-
 /**
  * Main class that launches pig for Spark
  */
@@ -598,6 +596,12 @@ public class SparkLauncher extends Launc
             // HTTP file server doesn't have this restriction, it overwrites 
the file if added twice
             String useNettyFileServer = 
pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, 
"false");
             sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer);
+            // Somehow ApplicationMaster stopped picking up 
__spark_hadoop_conf__.xml 
+            // for unit tests in Spark3.  Passing through spark_conf
+            
sparkConf.set("spark.hadoop.yarn.resourcemanager.scheduler.address",
+                  jobConf.get("yarn.resourcemanager.scheduler.address"));
+            sparkConf.set("spark.hadoop.yarn.resourcemanager.address",
+                  jobConf.get("yarn.resourcemanager.address"));
 
             if (sparkHome != null && !sparkHome.isEmpty()) {
                 sparkConf.setSparkHome(sparkHome);
@@ -645,11 +649,11 @@ public class SparkLauncher extends Launc
             checkAndConfigureDynamicAllocation(master, sparkConf);
 
             sparkContext = new JavaSparkContext(sparkConf);
-            SparkShims.getInstance().addSparkListener(sparkContext.sc(), 
jobStatisticCollector.getSparkListener());
-            SparkShims.getInstance().addSparkListener(sparkContext.sc(), new 
StatsReportListener());
+            
sparkContext.sc().addSparkListener(jobStatisticCollector.getSparkListener());
+            sparkContext.sc().addSparkListener(new StatsReportListener());
             allCachedFiles = new HashSet<String>();
         }
-        jobConf.set(SPARK_VERSION, sparkContext.version());
+        jobConf.set("pig.spark.version", sparkContext.version());
     }
 
     private static void checkAndConfigureDynamicAllocation(String master, 
SparkConf sparkConf) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
 Tue May 14 20:37:37 2024
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.spark;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.spark.SparkJobStats;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.scheduler.SparkListener;
-
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public abstract class SparkShims implements Serializable {
-    private static final Log LOG = LogFactory.getLog(SparkShims.class);
-    public static final String SPARK_VERSION = "pig.spark.version";
-
-    private static SparkShims sparkShims;
-
-    private static SparkShims loadShims(String sparkVersion) throws 
ReflectiveOperationException {
-        Class<?> sparkShimsClass;
-
-        if ("2".equals(sparkVersion)) {
-            LOG.info("Initializing shims for Spark 2.x");
-            sparkShimsClass = 
Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark2Shims");
-        } else {
-            LOG.info("Initializing shims for Spark 1.x");
-            sparkShimsClass = 
Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark1Shims");
-        }
-
-        Constructor c = sparkShimsClass.getConstructor();
-        return (SparkShims) c.newInstance();
-    }
-
-    public static SparkShims getInstance() {
-        if (sparkShims == null) {
-            String sparkVersion;
-            if (UDFContext.getUDFContext().isFrontend()) {
-                sparkVersion = SparkContext.getOrCreate().version();
-            } else {
-                sparkVersion = 
UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, "");
-            }
-            LOG.info("Initializing SparkShims for Spark version: " + 
sparkVersion);
-            String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
-            try {
-                sparkShims = loadShims(sparkMajorVersion);
-            } catch (ReflectiveOperationException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return sparkShims;
-    }
-
-    private static String getSparkMajorVersion(String sparkVersion) {
-        return sparkVersion.startsWith("2") ? "2" : "1";
-    }
-
-    public abstract <T, R> FlatMapFunction<T, R> 
flatMapFunction(FlatMapFunctionAdapter<T, R> function);
-
-    public abstract <T, K, V> PairFlatMapFunction<T, K, V> 
pairFlatMapFunction(PairFlatMapFunctionAdapter<T, K, V> function);
-
-    public abstract RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, 
boolean shuffle);
-
-    public abstract SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph 
plan, Configuration conf);
-
-    public abstract SparkJobStats sparkJobStats(String jobId, 
PigStats.JobGraph plan, Configuration conf);
-
-    public abstract <T> OptionalWrapper<T> wrapOptional(T tuple);
-
-    public abstract SparkListener getJobMetricsListener(Map<Integer, int[]> 
jobIdToStageId,
-                                                        Map<Integer, Integer> 
stageIdToJobId,
-                                                        Map<Integer, 
Map<String, List<TaskMetrics>>> allJobMetrics,
-                                                        Set<Integer> 
finishedJobIds);
-
-    public abstract void addSparkListener(SparkContext sc, SparkListener 
sparkListener);
-
-    public interface OptionalWrapper<T> {
-        boolean isPresent();
-
-        T get();
-    }
-}
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Tue May 14 20:37:37 2024
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -54,6 +55,8 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.rdd.RDD;
 
@@ -155,6 +158,22 @@ public class SparkUtil {
         baseSparkOp.physicalPlan.addAsLeaf(sort);
     }
 
+    public static <T, R> FlatMapFunction flatMapFunction(final 
FlatMapFunctionAdapter<T, R> function) {
+        return new FlatMapFunction<T, R>() {
+            @Override
+            public Iterator<R> call(T t) throws Exception {
+                return function.call(t);
+            }
+        };
+    }
+
+    public static <T, K, V> PairFlatMapFunction<T, K, V> 
pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) {
+        return new PairFlatMapFunction<T, K, V>() {
+            @Override
+            public Iterator<Tuple2<K, V>> call(T t) throws Exception {
+                return function.call(t);
+            }
+        };
+    }
 
-
-}
\ No newline at end of file
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 Tue May 14 20:37:37 2024
@@ -25,7 +25,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.RDD;
@@ -40,8 +39,8 @@ public class CollectedGroupConverter imp
         RDD<Tuple> rdd = predecessors.get(0);
         CollectedGroupFunction collectedGroupFunction
                 = new CollectedGroupFunction(physicalOperator);
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(collectedGroupFunction),
 true)
-                .rdd();
+        return rdd.toJavaRDD().mapPartitions(
+              SparkUtil.flatMapFunction(collectedGroupFunction), true).rdd();
     }
 
     private static class CollectedGroupFunction

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 Tue May 14 20:37:37 2024
@@ -34,7 +34,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.RDD;
@@ -54,7 +53,7 @@ public class FRJoinConverter implements
         attachReplicatedInputs((POFRJoinSpark) poFRJoin);
 
         FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin);
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(frJoinFunction),
 true).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(frJoinFunction), 
true).rdd();
     }
 
     private void attachReplicatedInputs(POFRJoinSpark poFRJoin) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 Tue May 14 20:37:37 2024
@@ -31,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
 import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
@@ -61,7 +60,7 @@ public class ForEachConverter implements
         RDD<Tuple> rdd = predecessors.get(0);
         ForEachFunction forEachFunction = new 
ForEachFunction(physicalOperator, confBytes);
 
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(forEachFunction),
 true).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(forEachFunction), 
true).rdd();
     }
 
     private static class ForEachFunction implements

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 Tue May 14 20:37:37 2024
@@ -25,11 +25,13 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.PartitionCoalescer;
 import org.apache.spark.rdd.RDD;
 
+import scala.Option;
+
 @SuppressWarnings({ "serial" })
 public class LimitConverter implements RDDConverter<Tuple, Tuple, POLimit> {
 
@@ -39,8 +41,8 @@ public class LimitConverter implements R
         SparkUtil.assertPredecessorSize(predecessors, poLimit, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         LimitFunction limitFunction = new LimitFunction(poLimit);
-        RDD<Tuple> rdd2 = SparkShims.getInstance().coalesce(rdd, 1, false);
-        return 
rdd2.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(limitFunction),
 false).rdd();
+        RDD<Tuple> rdd2 = rdd.coalesce(1, false, 
Option.<PartitionCoalescer>empty(), null);
+        return 
rdd2.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(limitFunction), 
false).rdd();
     }
 
     private static class LimitFunction implements 
FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
 Tue May 14 20:37:37 2024
@@ -25,7 +25,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.RDD;
@@ -38,7 +37,7 @@ public class MergeCogroupConverter imple
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         MergeCogroupFunction mergeCogroupFunction = new 
MergeCogroupFunction(physicalOperator);
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeCogroupFunction),
 true).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(mergeCogroupFunction), 
true).rdd();
     }
 
     private static class MergeCogroupFunction implements

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
 Tue May 14 20:37:37 2024
@@ -26,7 +26,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.RDD;
@@ -44,7 +43,7 @@ public class MergeJoinConverter implemen
         RDD<Tuple> rdd = predecessors.get(0);
         MergeJoinFunction mergeJoinFunction = new 
MergeJoinFunction(poMergeJoin);
 
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeJoinFunction),
 true).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(mergeJoinFunction), 
true).rdd();
     }
 
     private static class MergeJoinFunction implements

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
 Tue May 14 20:37:37 2024
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
 import org.apache.pig.data.Tuple;
@@ -38,7 +37,7 @@ public class PoissonSampleConverter impl
         SparkUtil.assertPredecessorSize(predecessors, po, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         PoissionSampleFunction poissionSampleFunction = new 
PoissionSampleFunction(po);
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(poissionSampleFunction),
 false).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(poissionSampleFunction),
 false).rdd();
     }
 
     private static class PoissionSampleFunction implements 
FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
 Tue May 14 20:37:37 2024
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.Objects;
 
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import scala.Tuple2;
 
@@ -57,7 +56,7 @@ public class SecondaryKeySortUtil {
         JavaPairRDD<IndexedKey, Tuple> sorted = 
pairRDD.repartitionAndSortWithinPartitions(
                 new IndexedKeyPartitioner(partitionNums));
         //Package tuples with same indexedkey as the result: 
(key,(val1,val2,val3,...))
-        return 
sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(new 
AccumulateByKey(pkgOp)), true).rdd();
+        return sorted.mapPartitions(SparkUtil.flatMapFunction(new 
AccumulateByKey(pkgOp)), true).rdd();
     }
 
     //Package tuples with same indexedkey as the result: 
(key,(val1,val2,val3,...))

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 Tue May 14 20:37:37 2024
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.util.Pair;
@@ -55,6 +54,7 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.rdd.RDD;
 
 public class SkewedJoinConverter implements
@@ -103,7 +103,7 @@ public class SkewedJoinConverter impleme
 
         // with partition id
         StreamPartitionIndexKeyFunction streamFun = new 
StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
-        JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = 
rdd2.toJavaRDD().flatMap(SparkShims.getInstance().flatMapFunction(streamFun));
+        JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = 
rdd2.toJavaRDD().flatMap(SparkUtil.flatMapFunction(streamFun));
 
         // Tuple2 RDD to Pair RDD
         JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new 
JavaPairRDD<PartitionIndexedKey, Tuple>(
@@ -187,8 +187,7 @@ public class SkewedJoinConverter impleme
 
                             Tuple leftTuple = tf.newTuple();
                             if (!innerFlags[0]) {
-                                // left should be Optional<Tuple>
-                                SparkShims.OptionalWrapper<L> leftOption = 
SparkShims.getInstance().wrapOptional(left);
+                                Optional<Tuple> leftOption = (Optional<Tuple>) 
left;
                                 if (!leftOption.isPresent()) {
                                     // Add an empty left record for RIGHT 
OUTER JOIN.
                                     // Notice: if it is a skewed, only join 
the first reduce key
@@ -200,7 +199,7 @@ public class SkewedJoinConverter impleme
                                         return this.next();
                                     }
                                 } else {
-                                    leftTuple = (Tuple) leftOption.get();
+                                    leftTuple = leftOption.get();
                                 }
                             } else {
                                 leftTuple = (Tuple) left;
@@ -211,14 +210,13 @@ public class SkewedJoinConverter impleme
 
                             Tuple rightTuple = tf.newTuple();
                             if (!innerFlags[1]) {
-                                // right should be Optional<Tuple>
-                                SparkShims.OptionalWrapper<R> rightOption = 
SparkShims.getInstance().wrapOptional(right);
+                                Optional<Tuple> rightOption = 
(Optional<Tuple>) right;
                                 if (!rightOption.isPresent()) {
                                     for (int i = 0; i < schemaSize[1]; i++) {
                                         rightTuple.append(null);
                                     }
                                 } else {
-                                    rightTuple = (Tuple) rightOption.get();
+                                    rightTuple = rightOption.get();
                                 }
                             } else {
                                 rightTuple = (Tuple) right;
@@ -608,22 +606,22 @@ public class SkewedJoinConverter impleme
             JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> 
resultKeyValue = skewIndexedJavaPairRDD.
                     join(streamIndexedJavaPairRDD, partitioner);
 
-            return 
resultKeyValue.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
+            return 
resultKeyValue.mapPartitions(SparkUtil.flatMapFunction(toValueFun));
         } else if (innerFlags[0] && !innerFlags[1]) {
             // left outer join
             return skewIndexedJavaPairRDD
                     .leftOuterJoin(streamIndexedJavaPairRDD, partitioner)
-                    
.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
+                    .mapPartitions(SparkUtil.flatMapFunction(toValueFun));
         } else if (!innerFlags[0] && innerFlags[1]) {
             // right outer join
             return skewIndexedJavaPairRDD
                     .rightOuterJoin(streamIndexedJavaPairRDD, partitioner)
-                    
.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
+                    .mapPartitions(SparkUtil.flatMapFunction(toValueFun));
         } else {
             // full outer join
             return skewIndexedJavaPairRDD
                     .fullOuterJoin(streamIndexedJavaPairRDD, partitioner)
-                    
.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
+                    .mapPartitions(SparkUtil.flatMapFunction(toValueFun));
         }
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
 Tue May 14 20:37:37 2024
@@ -24,7 +24,6 @@ import java.util.List;
 
 import 
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
@@ -58,7 +57,7 @@ public class SortConverter implements RD
 
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
                 sortOperator.getMComparator(), true, parallelism);
-        JavaRDD<Tuple> mapped = 
sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(TO_VALUE_FUNCTION));
+        JavaRDD<Tuple> mapped = 
sorted.mapPartitions(SparkUtil.flatMapFunction(TO_VALUE_FUNCTION));
 
         return mapped.rdd();
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
 Tue May 14 20:37:37 2024
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import 
org.apache.pig.backend.hadoop.executionengine.spark.PairFlatMapFunctionAdapter;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.spark.api.java.function.Function2;
 import scala.Tuple2;
@@ -65,7 +64,7 @@ public class SparkSampleSortConverter im
          //sort sample data
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
          //convert every element in sample data from element to (all, element) 
format
-        JavaPairRDD<String, Tuple> mapped = 
sorted.mapPartitionsToPair(SparkShims.getInstance().pairFlatMapFunction(new 
AggregateFunction()));
+        JavaPairRDD<String, Tuple> mapped = 
sorted.mapPartitionsToPair(SparkUtil.pairFlatMapFunction(new 
AggregateFunction()));
         //use groupByKey to aggregate all values( the format will be 
((all),{(sampleEle1),(sampleEle2),...} )
         JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new 
ToValueFunction());
         return  groupByKey.rdd();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 Tue May 14 20:37:37 2024
@@ -38,7 +38,7 @@ public class StreamConverter implements
         SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         StreamFunction streamFunction = new StreamFunction(poStream);
-        return 
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(streamFunction),
 true).rdd();
+        return 
rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(streamFunction), 
true).rdd();
     }
 
     private static class StreamFunction implements

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java Tue 
May 14 20:37:37 2024
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.tools.pigstats.spark;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.spark.executor.ShuffleReadMetrics;
-import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.executor.TaskMetrics;
-import scala.Option;
-
-import java.util.List;
-import java.util.Map;
-
-public class Spark1JobStats extends SparkJobStats {
-    public Spark1JobStats(int jobId, PigStats.JobGraph plan, Configuration 
conf) {
-        super(jobId, plan, conf);
-    }
-
-    public Spark1JobStats(String jobId, PigStats.JobGraph plan, Configuration 
conf) {
-        super(jobId, plan, conf);
-    }
-
-    @Override
-    protected Map<String, Long> combineTaskMetrics(Map<String, 
List<TaskMetrics>> jobMetric) {
-        Map<String, Long> results = Maps.newLinkedHashMap();
-
-        long executorDeserializeTime = 0;
-        long executorRunTime = 0;
-        long resultSize = 0;
-        long jvmGCTime = 0;
-        long resultSerializationTime = 0;
-        long memoryBytesSpilled = 0;
-        long diskBytesSpilled = 0;
-        long bytesRead = 0;
-        long bytesWritten = 0;
-        long remoteBlocksFetched = 0;
-        long localBlocksFetched = 0;
-        long fetchWaitTime = 0;
-        long remoteBytesRead = 0;
-        long shuffleBytesWritten = 0;
-        long shuffleWriteTime = 0;
-        boolean inputMetricExist = false;
-        boolean outputMetricExist = false;
-        boolean shuffleReadMetricExist = false;
-        boolean shuffleWriteMetricExist = false;
-
-        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-            if (stageMetric != null) {
-                for (TaskMetrics taskMetrics : stageMetric) {
-                    if (taskMetrics != null) {
-                        executorDeserializeTime += 
taskMetrics.executorDeserializeTime();
-                        executorRunTime += taskMetrics.executorRunTime();
-                        resultSize += taskMetrics.resultSize();
-                        jvmGCTime += taskMetrics.jvmGCTime();
-                        resultSerializationTime += 
taskMetrics.resultSerializationTime();
-                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
-                        if (!taskMetrics.inputMetrics().isEmpty()) {
-                            inputMetricExist = true;
-                            bytesRead += 
taskMetrics.inputMetrics().get().bytesRead();
-                        }
-
-                        if (!taskMetrics.outputMetrics().isEmpty()) {
-                            outputMetricExist = true;
-                            bytesWritten += 
taskMetrics.outputMetrics().get().bytesWritten();
-                        }
-
-                        Option<ShuffleReadMetrics> shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
-                        if (!shuffleReadMetricsOption.isEmpty()) {
-                            shuffleReadMetricExist = true;
-                            remoteBlocksFetched += 
shuffleReadMetricsOption.get().remoteBlocksFetched();
-                            localBlocksFetched += 
shuffleReadMetricsOption.get().localBlocksFetched();
-                            fetchWaitTime += 
shuffleReadMetricsOption.get().fetchWaitTime();
-                            remoteBytesRead += 
shuffleReadMetricsOption.get().remoteBytesRead();
-                        }
-
-                        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption 
= taskMetrics.shuffleWriteMetrics();
-                        if (!shuffleWriteMetricsOption.isEmpty()) {
-                            shuffleWriteMetricExist = true;
-                            shuffleBytesWritten += 
shuffleWriteMetricsOption.get().shuffleBytesWritten();
-                            shuffleWriteTime += 
shuffleWriteMetricsOption.get().shuffleWriteTime();
-                        }
-
-                    }
-                }
-            }
-        }
-
-        results.put("ExcutorDeserializeTime", executorDeserializeTime);
-        results.put("ExecutorRunTime", executorRunTime);
-        results.put("ResultSize", resultSize);
-        results.put("JvmGCTime", jvmGCTime);
-        results.put("ResultSerializationTime", resultSerializationTime);
-        results.put("MemoryBytesSpilled", memoryBytesSpilled);
-        results.put("DiskBytesSpilled", diskBytesSpilled);
-        if (inputMetricExist) {
-            results.put("BytesRead", bytesRead);
-            hdfsBytesRead = bytesRead;
-            counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
-        }
-
-        if (outputMetricExist) {
-            results.put("BytesWritten", bytesWritten);
-            hdfsBytesWritten = bytesWritten;
-            counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
-        }
-
-        if (shuffleReadMetricExist) {
-            results.put("RemoteBlocksFetched", remoteBlocksFetched);
-            results.put("LocalBlocksFetched", localBlocksFetched);
-            results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
-            results.put("FetchWaitTime", fetchWaitTime);
-            results.put("RemoteBytesRead", remoteBytesRead);
-        }
-
-        if (shuffleWriteMetricExist) {
-            results.put("ShuffleBytesWritten", shuffleBytesWritten);
-            results.put("ShuffleWriteTime", shuffleWriteTime);
-        }
-
-        return results;
-    }
-}

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Tue 
May 14 20:37:37 2024
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.tools.pigstats.spark;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.spark.executor.ShuffleReadMetrics;
-import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.executor.TaskMetrics;
-
-import java.util.List;
-import java.util.Map;
-
-public class Spark2JobStats extends SparkJobStats {
-    public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration 
conf) {
-        super(jobId, plan, conf);
-    }
-
-    public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration 
conf) {
-        super(jobId, plan, conf);
-    }
-
-    @Override
-    protected Map<String, Long> combineTaskMetrics(Map<String, 
List<TaskMetrics>> jobMetric) {
-        Map<String, Long> results = Maps.newLinkedHashMap();
-
-        long executorDeserializeTime = 0;
-        long executorRunTime = 0;
-        long resultSize = 0;
-        long jvmGCTime = 0;
-        long resultSerializationTime = 0;
-        long memoryBytesSpilled = 0;
-        long diskBytesSpilled = 0;
-        long bytesRead = 0;
-        long bytesWritten = 0;
-        long remoteBlocksFetched = 0;
-        long localBlocksFetched = 0;
-        long fetchWaitTime = 0;
-        long remoteBytesRead = 0;
-        long shuffleBytesWritten = 0;
-        long shuffleWriteTime = 0;
-
-        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-            if (stageMetric != null) {
-                for (TaskMetrics taskMetrics : stageMetric) {
-                    if (taskMetrics != null) {
-                        executorDeserializeTime += 
taskMetrics.executorDeserializeTime();
-                        executorRunTime += taskMetrics.executorRunTime();
-                        resultSize += taskMetrics.resultSize();
-                        jvmGCTime += taskMetrics.jvmGCTime();
-                        resultSerializationTime += 
taskMetrics.resultSerializationTime();
-                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
-                        bytesRead += taskMetrics.inputMetrics().bytesRead();
-
-                        bytesWritten += 
taskMetrics.outputMetrics().bytesWritten();
-
-                        ShuffleReadMetrics shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
-                        remoteBlocksFetched += 
shuffleReadMetricsOption.remoteBlocksFetched();
-                        localBlocksFetched += 
shuffleReadMetricsOption.localBlocksFetched();
-                        fetchWaitTime += 
shuffleReadMetricsOption.fetchWaitTime();
-                        remoteBytesRead += 
shuffleReadMetricsOption.remoteBytesRead();
-
-                        ShuffleWriteMetrics shuffleWriteMetricsOption = 
taskMetrics.shuffleWriteMetrics();
-                        shuffleBytesWritten += 
shuffleWriteMetricsOption.shuffleBytesWritten();
-                        shuffleWriteTime += 
shuffleWriteMetricsOption.shuffleWriteTime();
-                    }
-                }
-            }
-        }
-
-        results.put("ExcutorDeserializeTime", executorDeserializeTime);
-        results.put("ExecutorRunTime", executorRunTime);
-        results.put("ResultSize", resultSize);
-        results.put("JvmGCTime", jvmGCTime);
-        results.put("ResultSerializationTime", resultSerializationTime);
-        results.put("MemoryBytesSpilled", memoryBytesSpilled);
-        results.put("DiskBytesSpilled", diskBytesSpilled);
-
-        results.put("BytesRead", bytesRead);
-        hdfsBytesRead = bytesRead;
-        counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, 
hdfsBytesRead);
-
-        results.put("BytesWritten", bytesWritten);
-        hdfsBytesWritten = bytesWritten;
-        counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
-
-        results.put("RemoteBlocksFetched", remoteBlocksFetched);
-        results.put("LocalBlocksFetched", localBlocksFetched);
-        results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
-        results.put("FetchWaitTime", fetchWaitTime);
-        results.put("RemoteBytesRead", remoteBytesRead);
-
-        results.put("ShuffleBytesWritten", shuffleBytesWritten);
-        results.put("ShuffleWriteTime", shuffleWriteTime);
-
-        return results;
-    }
-}

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Tue 
May 14 20:37:37 2024
@@ -35,11 +35,13 @@ import org.apache.pig.tools.pigstats.Job
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
 
 import com.google.common.collect.Maps;
 
-public abstract class SparkJobStats extends JobStats {
+public class SparkJobStats extends JobStats {
 
     private int jobId;
     private Map<String, Long> stats = Maps.newLinkedHashMap();
@@ -110,8 +112,82 @@ public abstract class SparkJobStats exte
         }
     }
 
-    protected abstract Map<String, Long> combineTaskMetrics(Map<String, 
List<TaskMetrics>> jobMetric);
+    protected Map<String, Long> combineTaskMetrics(
+                    Map<String, List<TaskMetrics>> jobMetric) {
+        Map<String, Long> results = Maps.newLinkedHashMap();
+
+        long executorDeserializeTime = 0;
+        long executorRunTime = 0;
+        long resultSize = 0;
+        long jvmGCTime = 0;
+        long resultSerializationTime = 0;
+        long memoryBytesSpilled = 0;
+        long diskBytesSpilled = 0;
+        long bytesRead = 0;
+        long bytesWritten = 0;
+        long remoteBlocksFetched = 0;
+        long localBlocksFetched = 0;
+        long fetchWaitTime = 0;
+        long remoteBytesRead = 0;
+        long shuffleBytesWritten = 0;
+        long shuffleWriteTime = 0;
+
+        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+            if (stageMetric != null) {
+                for (TaskMetrics taskMetrics : stageMetric) {
+                    if (taskMetrics != null) {
+                        executorDeserializeTime += 
taskMetrics.executorDeserializeTime();
+                        executorRunTime += taskMetrics.executorRunTime();
+                        resultSize += taskMetrics.resultSize();
+                        jvmGCTime += taskMetrics.jvmGCTime();
+                        resultSerializationTime += 
taskMetrics.resultSerializationTime();
+                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+                        bytesRead += taskMetrics.inputMetrics().bytesRead();
+
+                        bytesWritten += 
taskMetrics.outputMetrics().bytesWritten();
+
+                        ShuffleReadMetrics shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
+                        remoteBlocksFetched += 
shuffleReadMetricsOption.remoteBlocksFetched();
+                        localBlocksFetched += 
shuffleReadMetricsOption.localBlocksFetched();
+                        fetchWaitTime += 
shuffleReadMetricsOption.fetchWaitTime();
+                        remoteBytesRead += 
shuffleReadMetricsOption.remoteBytesRead();
+
+                        ShuffleWriteMetrics shuffleWriteMetricsOption = 
taskMetrics.shuffleWriteMetrics();
+                        shuffleBytesWritten += 
shuffleWriteMetricsOption.bytesWritten();
+                        shuffleWriteTime += 
shuffleWriteMetricsOption.writeTime();
+                    }
+                }
+            }
+        }
+
+        results.put("ExcutorDeserializeTime", executorDeserializeTime);
+        results.put("ExecutorRunTime", executorRunTime);
+        results.put("ResultSize", resultSize);
+        results.put("JvmGCTime", jvmGCTime);
+        results.put("ResultSerializationTime", resultSerializationTime);
+        results.put("MemoryBytesSpilled", memoryBytesSpilled);
+        results.put("DiskBytesSpilled", diskBytesSpilled);
+
+        results.put("BytesRead", bytesRead);
+        hdfsBytesRead = bytesRead;
+        counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, 
hdfsBytesRead);
+
+        results.put("BytesWritten", bytesWritten);
+        hdfsBytesWritten = bytesWritten;
+        counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+
+        results.put("RemoteBlocksFetched", remoteBlocksFetched);
+        results.put("LocalBlocksFetched", localBlocksFetched);
+        results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
+        results.put("FetchWaitTime", fetchWaitTime);
+        results.put("RemoteBytesRead", remoteBytesRead);
 
+        results.put("ShuffleBytesWritten", shuffleBytesWritten);
+        results.put("ShuffleWriteTime", shuffleWriteTime);
+
+        return results;
+    }
     public Map<String, Long> getStats() {
         return stats;
     }



Reply via email to