Author: knoguchi Date: Tue May 14 20:37:37 2024 New Revision: 1917723 URL: http://svn.apache.org/viewvc?rev=1917723&view=rev Log: PIG-5439: Support Spark 3 and drop SparkShim (knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/build.xml pig/trunk/ivy.xml pig/trunk/ivy/libraries-h2.properties pig/trunk/ivy/libraries-h3.properties pig/trunk/ivy/libraries.properties pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java pig/trunk/test/org/apache/pig/test/TestStoreBase.java pig/trunk/test/org/apache/pig/test/Util.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue May 14 20:37:37 2024 @@ -24,6 +24,7 @@ INCOMPATIBLE CHANGES IMPROVEMENTS PIG-5438: Update SparkCounter.Accumulator to AccumulatorV2 (knoguchi) +PIG-5439: Support Spark 3 and drop SparkShim (knoguchi) OPTIMIZATIONS Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Tue May 14 20:37:37 2024 @@ -245,13 +245,9 @@ </then> </if> <property name="hbaseversion" value="1" /> - <property name="sparkversion" value="1" /> + <property name="sparkversion" value="2" /> <property name="hiveversion" value="1" /> - <condition property="src.exclude.dir" value="**/Spark2*.java" else="**/Spark1*.java"> - <equals arg1="${sparkversion}" arg2="1"/> - </condition> - <loadproperties srcfile="${ivy.dir}/libraries.properties"/> <property name="src.shims.dir" value="${basedir}/shims/src/hadoop2" /> @@ -586,7 +582,7 @@ </copy> </target> - <target name="compile-test" depends="jar-simple, ivy-test"> + <target name="compile-test" depends="jar, ivy-test"> <echo>*** Building Test Sources ***</echo> <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo> <echo>*** Else, you will only be warned about deprecations ***</echo> @@ -710,7 +706,7 @@ <!-- ================================================================== --> <!-- Make pig.jar --> <!-- ================================================================== --> - <target name="jar-simple" depends="compile,ivy-buildJar" description="Create pig core jar"> + <target name="jar" depends="compile,ivy-buildJar" description="Create pig core jar"> <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/> <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.withouthadoop}" includedJars="runtime.dependencies-withouthadoop.jar"/> <antcall target="copyCommonDependencies"/> @@ -764,6 +760,7 @@ <mkdir dir="${spark.lib.dir}" /> <copy todir="${spark.lib.dir}"> <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/> + <fileset dir="${ivy.lib.dir}" includes="jackson-*.jar"/> </copy> </target> @@ -858,44 +855,6 @@ <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/> </target> - <target name="jar" description="Create pig jar with Spark 1 and 2"> - <echo>Compiling against Spark 2</echo> - <antcall target="clean-deps" inheritRefs="true" inheritall="true"/> - <propertyreset name="sparkversion" value="2"/> - <propertyreset name="src.exclude.dir" value="**/Spark1*.java" /> - <antcall target="jar-core" inheritRefs="true" inheritall="true"/> - <move file="${output.jarfile.core}" tofile="${basedir}/_pig-shims.jar"/> - - <echo>Compiling against Spark 1</echo> - <antcall target="clean-deps" inheritRefs="true" inheritall="true"/> - <propertyreset name="sparkversion" value="1"/> - <propertyreset name="src.exclude.dir" value="**/Spark2*.java" /> - <antcall target="jar-simple" inheritRefs="true" inheritall="true"/> - <jar update="yes" jarfile="${output.jarfile.core}"> - <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> - </jar> - <if> - <equals arg1="${isHadoop2}" arg2="true" /> - <then> - <jar update="yes" jarfile="${output.jarfile.backcompat-core-h2}"> - <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> - </jar> - <jar update="yes" jarfile="${output.jarfile.withouthadoop-h2}"> - <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> - </jar> - </then> - <else> - <jar update="yes" jarfile="${output.jarfile.backcompat-core-h3}"> - <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> - </jar> - <jar update="yes" jarfile="${output.jarfile.withouthadoop-h3}"> - <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> - </jar> - </else> - </if> - <delete file="${basedir}/_pig-shims.jar"/> - </target> - <!-- ================================================================== --> <!-- macrodef: buildJar --> <!-- ================================================================== --> Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Tue May 14 20:37:37 2024 @@ -42,8 +42,8 @@ <conf name="hadoop3" visibility="private"/> <conf name="hbase1" visibility="private"/> <conf name="hbase2" visibility="private"/> - <conf name="spark1" visibility="private" /> <conf name="spark2" visibility="private" /> + <conf name="spark3" visibility="private" /> <conf name="hive1" visibility="private"/> <conf name="hive3" visibility="private"/> <conf name="owasp" visibility="private" description="Artifacts required for owasp target"/> @@ -239,9 +239,14 @@ <dependency org="org.apache.ivy" name="ivy" rev="${ivy.version}" conf="compile->master"/> <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}" - conf="compile->master"/> + conf="compile->master"> + <exclude org="com.thoughtworks.paranamer" module="paranamer"/> + </dependency> <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}" - conf="compile->master"/> + conf="compile->master"> + <exclude org="com.thoughtworks.paranamer" module="paranamer"/> + </dependency> + <dependency org="com.thoughtworks.paranamer" name="paranamer" rev="${paranamer.version}" conf="compile->master"/> <dependency org="org.fusesource.jansi" name="jansi" rev="${jansi.version}" conf="compile->master"/> <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" conf="compile->master"/> @@ -615,26 +620,43 @@ <dependency org="org.apache.parquet" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/> - <!-- for Spark 1.x integration --> - <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default"> - <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> - <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> - <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> - <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/> - <exclude org="org.apache.hadoop" /> - <exclude org="com.esotericsoftware.kryo" /> - <exclude org="jline" module="jline"/> - <exclude org="com.google.guava" /> + <!-- for Spark 2.x integration --> + <dependency org="org.apache.spark" name="spark-core_${spark2-scala.version}" rev="${spark2.version}" conf="spark2->default"> + <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-annotations"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-core"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-databind"/> + <exclude org="com.fasterxml.jackson.module" module="jackson-module-scala_${spark-scala.version}"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/> + <exclude org="org.apache.hadoop" /> + <exclude org="com.esotericsoftware.kryo" /> + <exclude org="jline" module="jline"/> + <exclude org="com.google.guava" /> + <exclude org="io.netty" module="netty"/> + <exclude org="io.netty" name="netty-all"/> </dependency> - <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark1.version}" conf="spark1->default"> - <exclude org="org.apache.hadoop" /> + <dependency org="org.apache.spark" name="spark-yarn_${spark2-scala.version}" rev="${spark2.version}" conf="spark2->default"> + <exclude org="org.apache.hadoop" /> </dependency> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark2->default;spark3->default"/> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/> + <dependency org="org.scala-lang.modules" name="scala-xml_${spark2-scala.version}" rev="${scala-xml.version}" conf="spark2->default"/> + <dependency org="com.fasterxml.jackson.module" name="jackson-module-scala_${spark2-scala.version}" + rev="${jackson-module-scala_spark2.version}" conf="spark2->default"/> + <dependency org="com.fasterxml.jackson.module" name="jackson-module-paranamer" + rev="${jackson-module-scala_spark2.version}" conf="spark2->default"/> <!-- for Spark 2.x integration --> - <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark2.version}" conf="spark2->default"> + <dependency org="org.apache.spark" name="spark-core_${spark3-scala.version}" rev="${spark3.version}" conf="spark3->default"> <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-annotations"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-core"/> + <exclude org="com.fasterxml.jackson.core" module="jackson-databind"/> + <exclude org="com.fasterxml.jackson.module" module="jackson-module-scala_${spark-scala.version}"/> <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/> <exclude org="org.apache.hadoop" /> <exclude org="com.esotericsoftware.kryo" /> @@ -643,14 +665,18 @@ <exclude org="io.netty" module="netty"/> <exclude org="io.netty" name="netty-all"/> </dependency> - <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark2.version}" conf="spark2->default"> + <dependency org="org.apache.spark" name="spark-yarn_${spark3-scala.version}" rev="${spark3.version}" conf="spark3->default"> <exclude org="org.apache.hadoop" /> </dependency> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark3->default;spark3->default"/> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/> + <dependency org="org.scala-lang.modules" name="scala-xml_${spark3-scala.version}" rev="${scala-xml.version}" conf="spark3->default"/> + <dependency org="com.fasterxml.jackson.module" name="jackson-module-scala_${spark3-scala.version}" + rev="${jackson-module-scala_spark3.version}" conf="spark3->default"/> + <dependency org="com.fasterxml.jackson.module" name="jackson-module-paranamer" + rev="${jackson-module-scala_spark3.version}" conf="spark3->default"/> <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/> - <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark1->default;spark2->default"/> - <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/> - <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark1->default;spark2->default"/> <!-- for Tez integration --> <dependency org="org.apache.tez" name="tez" rev="${tez.version}" Modified: pig/trunk/ivy/libraries-h2.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries-h2.properties?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/ivy/libraries-h2.properties (original) +++ pig/trunk/ivy/libraries-h2.properties Tue May 14 20:37:37 2024 @@ -23,3 +23,5 @@ netty.version=3.10.6.Final netty-all.version=4.1.50.Final tez.version=0.9.2 servlet-api.version=2.5 +spark-scala.version=2.11 +jackson-module-scala.version=2.10.2 Modified: pig/trunk/ivy/libraries-h3.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries-h3.properties?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/ivy/libraries-h3.properties (original) +++ pig/trunk/ivy/libraries-h3.properties Tue May 14 20:37:37 2024 @@ -25,3 +25,5 @@ netty-all.version=4.1.77.Final re2j.version=1.0 tez.version=0.10.2 servlet-api.version=3.1.0 +spark-scala.version=2.12 +jackson-module-scala.version=2.10.2 Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Tue May 14 20:37:37 2024 @@ -66,8 +66,12 @@ rats-lib.version=0.5.1 slf4j-api.version=1.7.36 slf4j-reload4j.version=1.7.36 reload4j.version=1.2.24 -spark1.version=1.6.1 spark2.version=2.4.8 +spark2-scala.version=2.11 +jackson-module-scala_spark2.version=2.9.10 +spark3.version=3.2.4 +spark3-scala.version=2.12 +jackson-module-scala_spark3.version=2.10.2 xerces.version=2.10.0 xalan.version=2.7.1 wagon-http.version=1.0-beta-2 @@ -96,3 +100,4 @@ roaring-bitmap-shaded.version=0.7.14 dependency-check-ant.version=7.4.4 woodstox.version=5.3.0 stax2-api.version=4.2.1 +paranamer.version=2.8 Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java Tue May 14 20:37:37 2024 @@ -21,6 +21,8 @@ package org.apache.pig.backend.hadoop.ex import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; + import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListener; @@ -40,8 +42,8 @@ public class JobStatisticCollector { public SparkListener getSparkListener() { if (sparkListener == null) { - sparkListener = SparkShims.getInstance() - .getJobMetricsListener(jobIdToStageId, stageIdToJobId, allJobStatistics, finishedJobIds); + sparkListener = new JobMetricsListener(jobIdToStageId, + stageIdToJobId, allJobStatistics, finishedJobIds); } return sparkListener; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java Tue May 14 20:37:37 2024 @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.spark; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.data.Tuple; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.spark.SparkJobStats; -import org.apache.pig.tools.pigstats.spark.Spark1JobStats; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.rdd.RDD; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerBlockUpdated; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import scala.Tuple2; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class Spark1Shims extends SparkShims { - @Override - public <T, R> FlatMapFunction<T, R> flatMapFunction(final FlatMapFunctionAdapter<T, R> function) { - return new FlatMapFunction<T, R>() { - @Override - public Iterable<R> call(final T t) throws Exception { - return new Iterable<R>() { - @Override - public Iterator<R> iterator() { - try { - return function.call(t); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - } - }; - } - - @Override - public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) { - return new PairFlatMapFunction<T, K, V>() { - @Override - public Iterable<Tuple2<K, V>> call(final T t) throws Exception { - return new Iterable<Tuple2<K, V>>() { - @Override - public Iterator<Tuple2<K, V>> iterator() { - try { - return function.call(t); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - } - }; - } - - @Override - public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle) { - return rdd.coalesce(numPartitions, shuffle, null); - } - - @Override - public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { - return new Spark1JobStats(jobId, plan, conf); - } - - @Override - public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { - return new Spark1JobStats(jobId, plan, conf); - } - - @Override - public <T> OptionalWrapper<T> wrapOptional(T tuple) { - final Optional<T> t = (Optional<T>) tuple; - - return new OptionalWrapper<T>() { - @Override - public boolean isPresent() { - return t.isPresent(); - } - - @Override - public T get() { - return t.get(); - } - }; - } - - private static class JobMetricsListener implements SparkListener { - private final Log LOG = LogFactory.getLog(JobMetricsListener.class); - - private Map<Integer, int[]> jobIdToStageId; - private Map<Integer, Integer> stageIdToJobId; - private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics; - private Set<Integer> finishedJobIds; - - JobMetricsListener(final Map<Integer, int[]> jobIdToStageId, - final Map<Integer, Integer> stageIdToJobId, - final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, - final Set<Integer> finishedJobIds) { - this.jobIdToStageId = jobIdToStageId; - this.stageIdToJobId = stageIdToJobId; - this.allJobMetrics = allJobMetrics; - this.finishedJobIds = finishedJobIds; - } - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - } - - @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { - } - - @Override - public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { - } - - @Override - public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { - int stageId = taskEnd.stageId(); - int stageAttemptId = taskEnd.stageAttemptId(); - String stageIdentifier = stageId + "_" + stageAttemptId; - Integer jobId = stageIdToJobId.get(stageId); - if (jobId == null) { - LOG.warn("Cannot find job id for stage[" + stageId + "]."); - } else { - Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageIdentifier, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); - } - } - - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - int jobId = jobStart.jobId(); - int size = jobStart.stageIds().size(); - int[] intStageIds = new int[size]; - for (int i = 0; i < size; i++) { - Integer stageId = (Integer) jobStart.stageIds().apply(i); - intStageIds[i] = stageId; - stageIdToJobId.put(stageId, jobId); - } - jobIdToStageId.put(jobId, intStageIds); - } - - @Override - public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { - finishedJobIds.add(jobEnd.jobId()); - notify(); - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - } - } - - @Override - public SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, - Map<Integer, Integer> stageIdToJobId, - Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, - Set<Integer> finishedJobIds) { - return new JobMetricsListener(jobIdToStageId, stageIdToJobId, allJobMetrics, finishedJobIds); - } - - @Override - public void addSparkListener(SparkContext sc, SparkListener sparkListener) { - sc.addSparkListener(sparkListener); - } -} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java Tue May 14 20:37:37 2024 @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.spark; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.data.Tuple; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.spark.Spark2JobStats; -import org.apache.pig.tools.pigstats.spark.SparkJobStats; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.rdd.PartitionCoalescer; -import org.apache.spark.rdd.RDD; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import scala.Option; -import scala.Tuple2; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class Spark2Shims extends SparkShims { - @Override - public <T, R> FlatMapFunction flatMapFunction(final FlatMapFunctionAdapter<T, R> function) { - return new FlatMapFunction<T, R>() { - @Override - public Iterator<R> call(T t) throws Exception { - return function.call(t); - } - }; - } - - @Override - public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) { - return new PairFlatMapFunction<T, K, V>() { - @Override - public Iterator<Tuple2<K, V>> call(T t) throws Exception { - return function.call(t); - } - }; - } - - @Override - public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle) { - return rdd.coalesce(numPartitions, shuffle, Option.<PartitionCoalescer>empty(), null); - } - - @Override - public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { - return new Spark2JobStats(jobId, plan, conf); - } - - @Override - public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { - return new Spark2JobStats(jobId, plan, conf); - } - - @Override - public <T> OptionalWrapper<T> wrapOptional(T tuple) { - final Optional<T> t = (Optional<T>) tuple; - - return new OptionalWrapper<T>() { - @Override - public boolean isPresent() { - return t.isPresent(); - } - - @Override - public T get() { - return t.get(); - } - }; - } - - private static class JobMetricsListener extends SparkListener { - private final Log LOG = LogFactory.getLog(JobMetricsListener.class); - - private Map<Integer, int[]> jobIdToStageId; - private Map<Integer, Integer> stageIdToJobId; - private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics; - private Set<Integer> finishedJobIds; - - JobMetricsListener(final Map<Integer, int[]> jobIdToStageId, - final Map<Integer, Integer> stageIdToJobId, - final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, - final Set<Integer> finishedJobIds) { - this.jobIdToStageId = jobIdToStageId; - this.stageIdToJobId = stageIdToJobId; - this.allJobMetrics = allJobMetrics; - this.finishedJobIds = finishedJobIds; - } - - @Override - public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - int stageId = stageCompleted.stageInfo().stageId(); - int stageAttemptId = stageCompleted.stageInfo().attemptId(); - String stageIdentifier = stageId + "_" + stageAttemptId; - Integer jobId = stageIdToJobId.get(stageId); - if (jobId == null) { - LOG.warn("Cannot find job id for stage[" + stageId + "]."); - } else { - Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageIdentifier, stageMetrics); - } - stageMetrics.add(stageCompleted.stageInfo().taskMetrics()); - } - } - - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - int jobId = jobStart.jobId(); - int size = jobStart.stageIds().size(); - int[] intStageIds = new int[size]; - for (int i = 0; i < size; i++) { - Integer stageId = (Integer) jobStart.stageIds().apply(i); - intStageIds[i] = stageId; - stageIdToJobId.put(stageId, jobId); - } - jobIdToStageId.put(jobId, intStageIds); - } - - @Override - public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { - finishedJobIds.add(jobEnd.jobId()); - notify(); - } - } - - @Override - public SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, - Map<Integer, Integer> stageIdToJobId, - Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, - Set<Integer> finishedJobIds) { - return new JobMetricsListener(jobIdToStageId, stageIdToJobId, allJobMetrics, finishedJobIds); - } - - @Override - public void addSparkListener(SparkContext sc, SparkListener sparkListener) { - sc.addSparkListener(sparkListener); - } - -} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue May 14 20:37:37 2024 @@ -144,8 +144,6 @@ import org.apache.spark.scheduler.StatsR import com.google.common.base.Joiner; -import static org.apache.pig.backend.hadoop.executionengine.spark.SparkShims.SPARK_VERSION; - /** * Main class that launches pig for Spark */ @@ -598,6 +596,12 @@ public class SparkLauncher extends Launc // HTTP file server doesn't have this restriction, it overwrites the file if added twice String useNettyFileServer = pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, "false"); sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer); + // Somehow ApplicationMaster stopped picking up __spark_hadoop_conf__.xml + // for unit tests in Spark3. Passing through spark_conf + sparkConf.set("spark.hadoop.yarn.resourcemanager.scheduler.address", + jobConf.get("yarn.resourcemanager.scheduler.address")); + sparkConf.set("spark.hadoop.yarn.resourcemanager.address", + jobConf.get("yarn.resourcemanager.address")); if (sparkHome != null && !sparkHome.isEmpty()) { sparkConf.setSparkHome(sparkHome); @@ -645,11 +649,11 @@ public class SparkLauncher extends Launc checkAndConfigureDynamicAllocation(master, sparkConf); sparkContext = new JavaSparkContext(sparkConf); - SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener()); - SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener()); + sparkContext.sc().addSparkListener(jobStatisticCollector.getSparkListener()); + sparkContext.sc().addSparkListener(new StatsReportListener()); allCachedFiles = new HashSet<String>(); } - jobConf.set(SPARK_VERSION, sparkContext.version()); + jobConf.set("pig.spark.version", sparkContext.version()); } private static void checkAndConfigureDynamicAllocation(String master, SparkConf sparkConf) { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java Tue May 14 20:37:37 2024 @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.spark; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.spark.SparkJobStats; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.rdd.RDD; -import org.apache.spark.scheduler.SparkListener; - -import java.io.Serializable; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public abstract class SparkShims implements Serializable { - private static final Log LOG = LogFactory.getLog(SparkShims.class); - public static final String SPARK_VERSION = "pig.spark.version"; - - private static SparkShims sparkShims; - - private static SparkShims loadShims(String sparkVersion) throws ReflectiveOperationException { - Class<?> sparkShimsClass; - - if ("2".equals(sparkVersion)) { - LOG.info("Initializing shims for Spark 2.x"); - sparkShimsClass = Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark2Shims"); - } else { - LOG.info("Initializing shims for Spark 1.x"); - sparkShimsClass = Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark1Shims"); - } - - Constructor c = sparkShimsClass.getConstructor(); - return (SparkShims) c.newInstance(); - } - - public static SparkShims getInstance() { - if (sparkShims == null) { - String sparkVersion; - if (UDFContext.getUDFContext().isFrontend()) { - sparkVersion = SparkContext.getOrCreate().version(); - } else { - sparkVersion = UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, ""); - } - LOG.info("Initializing SparkShims for Spark version: " + sparkVersion); - String sparkMajorVersion = getSparkMajorVersion(sparkVersion); - try { - sparkShims = loadShims(sparkMajorVersion); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - return sparkShims; - } - - private static String getSparkMajorVersion(String sparkVersion) { - return sparkVersion.startsWith("2") ? "2" : "1"; - } - - public abstract <T, R> FlatMapFunction<T, R> flatMapFunction(FlatMapFunctionAdapter<T, R> function); - - public abstract <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(PairFlatMapFunctionAdapter<T, K, V> function); - - public abstract RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle); - - public abstract SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf); - - public abstract SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf); - - public abstract <T> OptionalWrapper<T> wrapOptional(T tuple); - - public abstract SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, - Map<Integer, Integer> stageIdToJobId, - Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, - Set<Integer> finishedJobIds); - - public abstract void addSparkListener(SparkContext sc, SparkListener sparkListener); - - public interface OptionalWrapper<T> { - boolean isPresent(); - - T get(); - } -} \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Tue May 14 20:37:37 2024 @@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -54,6 +55,8 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.util.ObjectSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.rdd.RDD; @@ -155,6 +158,22 @@ public class SparkUtil { baseSparkOp.physicalPlan.addAsLeaf(sort); } + public static <T, R> FlatMapFunction flatMapFunction(final FlatMapFunctionAdapter<T, R> function) { + return new FlatMapFunction<T, R>() { + @Override + public Iterator<R> call(T t) throws Exception { + return function.call(t); + } + }; + } + + public static <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) { + return new PairFlatMapFunction<T, K, V>() { + @Override + public Iterator<Tuple2<K, V>> call(T t) throws Exception { + return function.call(t); + } + }; + } - -} \ No newline at end of file +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java Tue May 14 20:37:37 2024 @@ -25,7 +25,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.rdd.RDD; @@ -40,8 +39,8 @@ public class CollectedGroupConverter imp RDD<Tuple> rdd = predecessors.get(0); CollectedGroupFunction collectedGroupFunction = new CollectedGroupFunction(physicalOperator); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(collectedGroupFunction), true) - .rdd(); + return rdd.toJavaRDD().mapPartitions( + SparkUtil.flatMapFunction(collectedGroupFunction), true).rdd(); } private static class CollectedGroupFunction Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Tue May 14 20:37:37 2024 @@ -34,7 +34,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.rdd.RDD; @@ -54,7 +53,7 @@ public class FRJoinConverter implements attachReplicatedInputs((POFRJoinSpark) poFRJoin); FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(frJoinFunction), true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(frJoinFunction), true).rdd(); } private void attachReplicatedInputs(POFRJoinSpark poFRJoin) { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Tue May 14 20:37:37 2024 @@ -31,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; @@ -61,7 +60,7 @@ public class ForEachConverter implements RDD<Tuple> rdd = predecessors.get(0); ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(forEachFunction), true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(forEachFunction), true).rdd(); } private static class ForEachFunction implements Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Tue May 14 20:37:37 2024 @@ -25,11 +25,13 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; +import org.apache.spark.rdd.PartitionCoalescer; import org.apache.spark.rdd.RDD; +import scala.Option; + @SuppressWarnings({ "serial" }) public class LimitConverter implements RDDConverter<Tuple, Tuple, POLimit> { @@ -39,8 +41,8 @@ public class LimitConverter implements R SparkUtil.assertPredecessorSize(predecessors, poLimit, 1); RDD<Tuple> rdd = predecessors.get(0); LimitFunction limitFunction = new LimitFunction(poLimit); - RDD<Tuple> rdd2 = SparkShims.getInstance().coalesce(rdd, 1, false); - return rdd2.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(limitFunction), false).rdd(); + RDD<Tuple> rdd2 = rdd.coalesce(1, false, Option.<PartitionCoalescer>empty(), null); + return rdd2.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(limitFunction), false).rdd(); } private static class LimitFunction implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java Tue May 14 20:37:37 2024 @@ -25,7 +25,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.rdd.RDD; @@ -38,7 +37,7 @@ public class MergeCogroupConverter imple SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); RDD<Tuple> rdd = predecessors.get(0); MergeCogroupFunction mergeCogroupFunction = new MergeCogroupFunction(physicalOperator); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeCogroupFunction), true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(mergeCogroupFunction), true).rdd(); } private static class MergeCogroupFunction implements Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Tue May 14 20:37:37 2024 @@ -26,7 +26,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.spark.rdd.RDD; @@ -44,7 +43,7 @@ public class MergeJoinConverter implemen RDD<Tuple> rdd = predecessors.get(0); MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeJoinFunction), true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(mergeJoinFunction), true).rdd(); } private static class MergeJoinFunction implements Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java Tue May 14 20:37:37 2024 @@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; import org.apache.pig.data.Tuple; @@ -38,7 +37,7 @@ public class PoissonSampleConverter impl SparkUtil.assertPredecessorSize(predecessors, po, 1); RDD<Tuple> rdd = predecessors.get(0); PoissionSampleFunction poissionSampleFunction = new PoissionSampleFunction(po); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(poissionSampleFunction), false).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(poissionSampleFunction), false).rdd(); } private static class PoissionSampleFunction implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Tue May 14 20:37:37 2024 @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.Objects; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import scala.Tuple2; @@ -57,7 +56,7 @@ public class SecondaryKeySortUtil { JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions( new IndexedKeyPartitioner(partitionNums)); //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) - return sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(new AccumulateByKey(pkgOp)), true).rdd(); + return sorted.mapPartitions(SparkUtil.flatMapFunction(new AccumulateByKey(pkgOp)), true).rdd(); } //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Tue May 14 20:37:37 2024 @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.data.DataBag; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.util.Pair; @@ -55,6 +54,7 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.util.MultiMap; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.Optional; import org.apache.spark.rdd.RDD; public class SkewedJoinConverter implements @@ -103,7 +103,7 @@ public class SkewedJoinConverter impleme // with partition id StreamPartitionIndexKeyFunction streamFun = new StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism); - JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(SparkShims.getInstance().flatMapFunction(streamFun)); + JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(SparkUtil.flatMapFunction(streamFun)); // Tuple2 RDD to Pair RDD JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>( @@ -187,8 +187,7 @@ public class SkewedJoinConverter impleme Tuple leftTuple = tf.newTuple(); if (!innerFlags[0]) { - // left should be Optional<Tuple> - SparkShims.OptionalWrapper<L> leftOption = SparkShims.getInstance().wrapOptional(left); + Optional<Tuple> leftOption = (Optional<Tuple>) left; if (!leftOption.isPresent()) { // Add an empty left record for RIGHT OUTER JOIN. // Notice: if it is a skewed, only join the first reduce key @@ -200,7 +199,7 @@ public class SkewedJoinConverter impleme return this.next(); } } else { - leftTuple = (Tuple) leftOption.get(); + leftTuple = leftOption.get(); } } else { leftTuple = (Tuple) left; @@ -211,14 +210,13 @@ public class SkewedJoinConverter impleme Tuple rightTuple = tf.newTuple(); if (!innerFlags[1]) { - // right should be Optional<Tuple> - SparkShims.OptionalWrapper<R> rightOption = SparkShims.getInstance().wrapOptional(right); + Optional<Tuple> rightOption = (Optional<Tuple>) right; if (!rightOption.isPresent()) { for (int i = 0; i < schemaSize[1]; i++) { rightTuple.append(null); } } else { - rightTuple = (Tuple) rightOption.get(); + rightTuple = rightOption.get(); } } else { rightTuple = (Tuple) right; @@ -608,22 +606,22 @@ public class SkewedJoinConverter impleme JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD. join(streamIndexedJavaPairRDD, partitioner); - return resultKeyValue.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); + return resultKeyValue.mapPartitions(SparkUtil.flatMapFunction(toValueFun)); } else if (innerFlags[0] && !innerFlags[1]) { // left outer join return skewIndexedJavaPairRDD .leftOuterJoin(streamIndexedJavaPairRDD, partitioner) - .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); + .mapPartitions(SparkUtil.flatMapFunction(toValueFun)); } else if (!innerFlags[0] && innerFlags[1]) { // right outer join return skewIndexedJavaPairRDD .rightOuterJoin(streamIndexedJavaPairRDD, partitioner) - .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); + .mapPartitions(SparkUtil.flatMapFunction(toValueFun)); } else { // full outer join return skewIndexedJavaPairRDD .fullOuterJoin(streamIndexedJavaPairRDD, partitioner) - .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun)); + .mapPartitions(SparkUtil.flatMapFunction(toValueFun)); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Tue May 14 20:37:37 2024 @@ -24,7 +24,6 @@ import java.util.List; import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import scala.Tuple2; import scala.runtime.AbstractFunction1; @@ -58,7 +57,7 @@ public class SortConverter implements RD JavaPairRDD<Tuple, Object> sorted = r.sortByKey( sortOperator.getMComparator(), true, parallelism); - JavaRDD<Tuple> mapped = sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(TO_VALUE_FUNCTION)); + JavaRDD<Tuple> mapped = sorted.mapPartitions(SparkUtil.flatMapFunction(TO_VALUE_FUNCTION)); return mapped.rdd(); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java Tue May 14 20:37:37 2024 @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import org.apache.pig.backend.hadoop.executionengine.spark.PairFlatMapFunctionAdapter; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; @@ -65,7 +64,7 @@ public class SparkSampleSortConverter im //sort sample data JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true); //convert every element in sample data from element to (all, element) format - JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(SparkShims.getInstance().pairFlatMapFunction(new AggregateFunction())); + JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(SparkUtil.pairFlatMapFunction(new AggregateFunction())); //use groupByKey to aggregate all values( the format will be ((all),{(sampleEle1),(sampleEle2),...} ) JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new ToValueFunction()); return groupByKey.rdd(); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Tue May 14 20:37:37 2024 @@ -38,7 +38,7 @@ public class StreamConverter implements SparkUtil.assertPredecessorSize(predecessors, poStream, 1); RDD<Tuple> rdd = predecessors.get(0); StreamFunction streamFunction = new StreamFunction(poStream); - return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(streamFunction), true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkUtil.flatMapFunction(streamFunction), true).rdd(); } private static class StreamFunction implements Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java Tue May 14 20:37:37 2024 @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.tools.pigstats.spark; - -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.PigStatsUtil; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; -import scala.Option; - -import java.util.List; -import java.util.Map; - -public class Spark1JobStats extends SparkJobStats { - public Spark1JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { - super(jobId, plan, conf); - } - - public Spark1JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { - super(jobId, plan, conf); - } - - @Override - protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { - Map<String, Long> results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long bytesWritten = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean outputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List<TaskMetrics> stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - - if (!taskMetrics.outputMetrics().isEmpty()) { - outputMetricExist = true; - bytesWritten += taskMetrics.outputMetrics().get().bytesWritten(); - } - - Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - - Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - - } - } - } - } - - results.put("ExcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - hdfsBytesRead = bytesRead; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); - } - - if (outputMetricExist) { - results.put("BytesWritten", bytesWritten); - hdfsBytesWritten = bytesWritten; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); - } - - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - - return results; - } -} Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Tue May 14 20:37:37 2024 @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.tools.pigstats.spark; - -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.PigStatsUtil; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; - -import java.util.List; -import java.util.Map; - -public class Spark2JobStats extends SparkJobStats { - public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { - super(jobId, plan, conf); - } - - public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { - super(jobId, plan, conf); - } - - @Override - protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { - Map<String, Long> results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long bytesWritten = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - - for (List<TaskMetrics> stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - bytesRead += taskMetrics.inputMetrics().bytesRead(); - - bytesWritten += taskMetrics.outputMetrics().bytesWritten(); - - ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead(); - - ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - shuffleBytesWritten += shuffleWriteMetricsOption.shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.shuffleWriteTime(); - } - } - } - } - - results.put("ExcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - - results.put("BytesRead", bytesRead); - hdfsBytesRead = bytesRead; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); - - results.put("BytesWritten", bytesWritten); - hdfsBytesWritten = bytesWritten; - counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); - - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - - return results; - } -} Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Tue May 14 20:37:37 2024 @@ -35,11 +35,13 @@ import org.apache.pig.tools.pigstats.Job import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import com.google.common.collect.Maps; -public abstract class SparkJobStats extends JobStats { +public class SparkJobStats extends JobStats { private int jobId; private Map<String, Long> stats = Maps.newLinkedHashMap(); @@ -110,8 +112,82 @@ public abstract class SparkJobStats exte } } - protected abstract Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric); + protected Map<String, Long> combineTaskMetrics( + Map<String, List<TaskMetrics>> jobMetric) { + Map<String, Long> results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long bytesWritten = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + + for (List<TaskMetrics> stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + bytesRead += taskMetrics.inputMetrics().bytesRead(); + + bytesWritten += taskMetrics.outputMetrics().bytesWritten(); + + ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead(); + + ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + shuffleBytesWritten += shuffleWriteMetricsOption.bytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.writeTime(); + } + } + } + } + + results.put("ExcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + + results.put("BytesRead", bytesRead); + hdfsBytesRead = bytesRead; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); + + results.put("BytesWritten", bytesWritten); + hdfsBytesWritten = bytesWritten; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); + + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + + return results; + } public Map<String, Long> getStats() { return stats; }