Author: zly Date: Wed Jul 19 01:32:41 2017 New Revision: 1802347 URL: http://svn.apache.org/viewvc?rev=1802347&view=rev Log: PIG-5157:Upgrade to Spark 2.0 (nkollar via liyunzhang)
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Removed: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java Modified: pig/trunk/build.xml pig/trunk/ivy.xml pig/trunk/ivy/libraries.properties pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java pig/trunk/test/org/apache/pig/test/TestPigRunner.java Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Wed Jul 19 01:32:41 2017 @@ -207,8 +207,7 @@ <property name="ivy.repo.dir" value="${user.home}/ivyrepo" /> <property name="ivy.dir" location="ivy" /> <property name="loglevel" value="quiet" /> - <loadproperties srcfile="${ivy.dir}/libraries.properties"/> - + <loadproperties srcfile="${ivy.dir}/libraries.properties" /> <!-- Hadoop master version @@ -241,6 +240,11 @@ </then> </if> <property name="hbaseversion" value="1" /> + <property name="sparkversion" value="1" /> + + <condition property="src.exclude.dir" value="**/Spark2*.java" else="**/Spark1*.java"> + <equals arg1="${sparkversion}" arg2="1"/> + </condition> <property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" /> <property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" /> @@ -556,7 +560,7 @@ <echo>*** Building Main Sources ***</echo> <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo> <echo>*** Else, you will only be warned about deprecations ***</echo> - <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ***</echo> + <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ; Spark version used: ${sparkversion} ***</echo> <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir}" excludes="${src.exclude.dir}" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" /> <copy todir="${build.classes}/META-INF"> @@ -688,7 +692,7 @@ <!-- ================================================================== --> <!-- Make pig.jar --> <!-- ================================================================== --> - <target name="jar" depends="compile,ivy-buildJar" description="Create pig core jar"> + <target name="jar-simple" depends="compile,ivy-buildJar" description="Create pig core jar"> <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/> <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.withouthadoop}" includedJars="runtime.dependencies-withouthadoop.jar"/> <antcall target="copyCommonDependencies"/> @@ -788,6 +792,35 @@ </sequential> </macrodef> + <target name="jar-core" depends="compile,ivy-buildJar" description="Create only pig core jar"> + <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/> + </target> + + <target name="jar" description="Create pig jar with Spark 1 and 2"> + <echo>Compiling against Spark 2</echo> + <antcall target="clean" inheritRefs="true" inheritall="true"/> + <propertyreset name="sparkversion" value="2"/> + <propertyreset name="src.exclude.dir" value="**/Spark1*.java" /> + <antcall target="jar-core" inheritRefs="true" inheritall="true"/> + <move file="${output.jarfile.core}" tofile="${basedir}/_pig-shims.jar"/> + + <echo>Compiling against Spark 1</echo> + <antcall target="clean" inheritRefs="true" inheritall="true"/> + <propertyreset name="sparkversion" value="1"/> + <propertyreset name="src.exclude.dir" value="**/Spark2*.java" /> + <antcall target="jar-simple" inheritRefs="true" inheritall="true"/> + <jar update="yes" jarfile="${output.jarfile.core}"> + <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> + </jar> + <jar update="yes" jarfile="${output.jarfile.backcompat-core-h2}"> + <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> + </jar> + <jar update="yes" jarfile="${output.jarfile.withouthadoop-h2}"> + <zipfileset src="${basedir}/_pig-shims.jar" includes="**/Spark2*.class"/> + </jar> + <delete file="${basedir}/_pig-shims.jar"/> + </target> + <!-- ================================================================== --> <!-- macrodef: buildJar --> <!-- ================================================================== --> @@ -1655,7 +1688,7 @@ <target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies"> <property name="ivy.resolved" value="true"/> - <echo>*** Ivy resolve with Hadoop ${hadoopversion} and HBase ${hbaseversion} ***</echo> + <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark ${sparkversion} and HBase ${hbaseversion} ***</echo> <ivy:resolve log="${loglevel}" settingsRef="${ant.project.name}.ivy.settings" conf="compile"/> <ivy:report toDir="build/ivy/report"/> </target> @@ -1664,7 +1697,7 @@ <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/> <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" - pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark"/> + pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion}"/> <ivy:cachepath pathid="compile.classpath" conf="compile"/> </target> Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Wed Jul 19 01:32:41 2017 @@ -40,7 +40,8 @@ <conf name="buildJar" extends="compile,test" visibility="private"/> <conf name="hadoop2" visibility="private"/> <conf name="hbase1" visibility="private"/> - <conf name="spark" visibility="private" /> + <conf name="spark1" visibility="private" /> + <conf name="spark2" visibility="private" /> </configurations> <publications> <artifact name="pig" conf="master"/> @@ -407,8 +408,8 @@ <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/> - <!-- for Spark integration --> - <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark.version}" conf="spark->default"> + <!-- for Spark 1.x integration --> + <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default"> <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> @@ -418,12 +419,28 @@ <exclude org="jline" module="jline"/> <exclude org="com.google.guava" /> </dependency> - <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark.version}" conf="spark->default"> + <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark1.version}" conf="spark1->default"> <exclude org="org.apache.hadoop" /> </dependency> + + <!-- for Spark 2.x integration --> + <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark2.version}" conf="spark2->default"> + <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/> + <exclude org="org.apache.hadoop" /> + <exclude org="com.esotericsoftware.kryo" /> + <exclude org="jline" module="jline"/> + <exclude org="com.google.guava" /> + </dependency> + <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark2.version}" conf="spark2->default"> + <exclude org="org.apache.hadoop" /> + </dependency> + <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/> - <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark->default"/> - <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark->default"/> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark1->default;spark2->default"/> + <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark1->default;spark2->default"/> <!-- for Tez integration --> <dependency org="org.apache.tez" name="tez" rev="${tez.version}" Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Wed Jul 19 01:32:41 2017 @@ -73,7 +73,8 @@ netty-all.version=4.0.23.Final rats-lib.version=0.5.1 slf4j-api.version=1.6.1 slf4j-log4j12.version=1.6.1 -spark.version=1.6.1 +spark1.version=1.6.1 +spark2.version=2.1.1 xerces.version=2.10.0 xalan.version=2.7.1 wagon-http.version=1.0-beta-2 Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.Serializable; +import java.util.Iterator; + +public interface FlatMapFunctionAdapter<R, T> extends Serializable { + Iterator<T> call(final R r) throws Exception; +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Wed Jul 19 01:32:41 2017 @@ -81,7 +81,7 @@ public class JobGraphBuilder extends Spa private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap = null; private SparkPigStats sparkStats = null; private JavaSparkContext sparkContext = null; - private JobMetricsListener jobMetricsListener = null; + private JobStatisticCollector jobStatisticCollector = null; private String jobGroupID = null; private Set<Integer> seenJobIDs = new HashSet<Integer>(); private SparkOperPlan sparkPlan = null; @@ -91,14 +91,14 @@ public class JobGraphBuilder extends Spa private PigContext pc; public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap, - SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener - jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext pc) { + SparkPigStats sparkStats, JavaSparkContext sparkContext, JobStatisticCollector + jobStatisticCollector, String jobGroupID, JobConf jobConf, PigContext pc) { super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true)); this.sparkPlan = plan; this.convertMap = convertMap; this.sparkStats = sparkStats; this.sparkContext = sparkContext; - this.jobMetricsListener = jobMetricsListener; + this.jobStatisticCollector = jobStatisticCollector; this.jobGroupID = jobGroupID; this.jobConf = jobConf; this.pc = pc; @@ -223,7 +223,7 @@ public class JobGraphBuilder extends Spa } } SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator, - jobMetricsListener, sparkContext, sparkStats); + jobStatisticCollector, sparkContext, sparkStats); } } else { for (POStore poStore : poStores) { Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListener; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class JobStatisticCollector { + + private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap(); + private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap(); + private final Map<Integer, Map<String, List<TaskMetrics>>> allJobStatistics = Maps.newHashMap(); + private final Set<Integer> finishedJobIds = Sets.newHashSet(); + + private SparkListener sparkListener; + + public SparkListener getSparkListener() { + if (sparkListener == null) { + sparkListener = SparkShims.getInstance() + .getJobMetricsListener(jobIdToStageId, stageIdToJobId, allJobStatistics, finishedJobIds); + } + return sparkListener; + } + + public Map<String, List<TaskMetrics>> getJobMetric(int jobId) { + synchronized (sparkListener) { + return allJobStatistics.get(jobId); + } + } + + public boolean waitForJobToEnd(int jobId) throws InterruptedException { + synchronized (sparkListener) { + if (finishedJobIds.contains(jobId)) { + finishedJobIds.remove(jobId); + return true; + } + + sparkListener.wait(); + return false; + } + } + + public void cleanup(int jobId) { + synchronized (sparkListener) { + allJobStatistics.remove(jobId); + jobIdToStageId.remove(jobId); + Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, Integer> entry = iterator.next(); + if (entry.getValue() == jobId) { + iterator.remove(); + } + } + } + } + + public void reset() { + synchronized (sparkListener) { + stageIdToJobId.clear(); + jobIdToStageId.clear(); + allJobStatistics.clear(); + finishedJobIds.clear(); + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import scala.Tuple2; + +import java.io.Serializable; +import java.util.Iterator; + +public interface PairFlatMapFunctionAdapter<T, K, V> extends Serializable { + Iterator<Tuple2<K, V>> call(T t) throws Exception; +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.data.Tuple; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; +import org.apache.pig.tools.pigstats.spark.Spark1JobStats; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.rdd.RDD; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerBlockUpdated; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; +import scala.Tuple2; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Spark1Shims extends SparkShims { + @Override + public <T, R> FlatMapFunction<T, R> flatMapFunction(final FlatMapFunctionAdapter<T, R> function) { + return new FlatMapFunction<T, R>() { + @Override + public Iterable<R> call(final T t) throws Exception { + return new Iterable<R>() { + @Override + public Iterator<R> iterator() { + try { + return function.call(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + } + }; + } + + @Override + public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) { + return new PairFlatMapFunction<T, K, V>() { + @Override + public Iterable<Tuple2<K, V>> call(final T t) throws Exception { + return new Iterable<Tuple2<K, V>>() { + @Override + public Iterator<Tuple2<K, V>> iterator() { + try { + return function.call(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + } + }; + } + + @Override + public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle) { + return rdd.coalesce(numPartitions, shuffle, null); + } + + @Override + public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { + return new Spark1JobStats(jobId, plan, conf); + } + + @Override + public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { + return new Spark1JobStats(jobId, plan, conf); + } + + @Override + public <T> OptionalWrapper<T> wrapOptional(T tuple) { + final Optional<T> t = (Optional<T>) tuple; + + return new OptionalWrapper<T>() { + @Override + public boolean isPresent() { + return t.isPresent(); + } + + @Override + public T get() { + return t.get(); + } + }; + } + + private static class JobMetricsListener implements SparkListener { + private final Log LOG = LogFactory.getLog(JobMetricsListener.class); + + private Map<Integer, int[]> jobIdToStageId; + private Map<Integer, Integer> stageIdToJobId; + private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics; + private Set<Integer> finishedJobIds; + + JobMetricsListener(final Map<Integer, int[]> jobIdToStageId, + final Map<Integer, Integer> stageIdToJobId, + final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, + final Set<Integer> finishedJobIds) { + this.jobIdToStageId = jobIdToStageId; + this.stageIdToJobId = stageIdToJobId; + this.allJobMetrics = allJobMetrics; + this.finishedJobIds = finishedJobIds; + } + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { + } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + } + + @Override + public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { + } + + @Override + public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { + } + + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { + } + + @Override + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { + int stageId = taskEnd.stageId(); + int stageAttemptId = taskEnd.stageAttemptId(); + String stageIdentifier = stageId + "_" + stageAttemptId; + Integer jobId = stageIdToJobId.get(stageId); + if (jobId == null) { + LOG.warn("Cannot find job id for stage[" + stageId + "]."); + } else { + Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); + if (jobMetrics == null) { + jobMetrics = Maps.newHashMap(); + allJobMetrics.put(jobId, jobMetrics); + } + List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); + if (stageMetrics == null) { + stageMetrics = Lists.newLinkedList(); + jobMetrics.put(stageIdentifier, stageMetrics); + } + stageMetrics.add(taskEnd.taskMetrics()); + } + } + + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + int jobId = jobStart.jobId(); + int size = jobStart.stageIds().size(); + int[] intStageIds = new int[size]; + for (int i = 0; i < size; i++) { + Integer stageId = (Integer) jobStart.stageIds().apply(i); + intStageIds[i] = stageId; + stageIdToJobId.put(stageId, jobId); + } + jobIdToStageId.put(jobId, intStageIds); + } + + @Override + public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { + finishedJobIds.add(jobEnd.jobId()); + notify(); + } + + @Override + public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { + } + + @Override + public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { + } + + @Override + public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { + } + + @Override + public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { + } + + @Override + public void onApplicationStart(SparkListenerApplicationStart applicationStart) { + } + + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + } + + @Override + public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { + } + } + + @Override + public SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, + Map<Integer, Integer> stageIdToJobId, + Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, + Set<Integer> finishedJobIds) { + return new JobMetricsListener(jobIdToStageId, stageIdToJobId, allJobMetrics, finishedJobIds); + } + + @Override + public void addSparkListener(SparkContext sc, SparkListener sparkListener) { + sc.addSparkListener(sparkListener); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.data.Tuple; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.Spark2JobStats; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.rdd.PartitionCoalescer; +import org.apache.spark.rdd.RDD; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import scala.Option; +import scala.Tuple2; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Spark2Shims extends SparkShims { + @Override + public <T, R> FlatMapFunction flatMapFunction(final FlatMapFunctionAdapter<T, R> function) { + return new FlatMapFunction<T, R>() { + @Override + public Iterator<R> call(T t) throws Exception { + return function.call(t); + } + }; + } + + @Override + public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final PairFlatMapFunctionAdapter<T, K, V> function) { + return new PairFlatMapFunction<T, K, V>() { + @Override + public Iterator<Tuple2<K, V>> call(T t) throws Exception { + return function.call(t); + } + }; + } + + @Override + public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle) { + return rdd.coalesce(numPartitions, shuffle, Option.<PartitionCoalescer>empty(), null); + } + + @Override + public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { + return new Spark2JobStats(jobId, plan, conf); + } + + @Override + public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { + return new Spark2JobStats(jobId, plan, conf); + } + + @Override + public <T> OptionalWrapper<T> wrapOptional(T tuple) { + final Optional<T> t = (Optional<T>) tuple; + + return new OptionalWrapper<T>() { + @Override + public boolean isPresent() { + return t.isPresent(); + } + + @Override + public T get() { + return t.get(); + } + }; + } + + private static class JobMetricsListener extends SparkListener { + private final Log LOG = LogFactory.getLog(JobMetricsListener.class); + + private Map<Integer, int[]> jobIdToStageId; + private Map<Integer, Integer> stageIdToJobId; + private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics; + private Set<Integer> finishedJobIds; + + JobMetricsListener(final Map<Integer, int[]> jobIdToStageId, + final Map<Integer, Integer> stageIdToJobId, + final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, + final Set<Integer> finishedJobIds) { + this.jobIdToStageId = jobIdToStageId; + this.stageIdToJobId = stageIdToJobId; + this.allJobMetrics = allJobMetrics; + this.finishedJobIds = finishedJobIds; + } + + @Override + public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + int stageId = stageCompleted.stageInfo().stageId(); + int stageAttemptId = stageCompleted.stageInfo().attemptId(); + String stageIdentifier = stageId + "_" + stageAttemptId; + Integer jobId = stageIdToJobId.get(stageId); + if (jobId == null) { + LOG.warn("Cannot find job id for stage[" + stageId + "]."); + } else { + Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); + if (jobMetrics == null) { + jobMetrics = Maps.newHashMap(); + allJobMetrics.put(jobId, jobMetrics); + } + List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); + if (stageMetrics == null) { + stageMetrics = Lists.newLinkedList(); + jobMetrics.put(stageIdentifier, stageMetrics); + } + stageMetrics.add(stageCompleted.stageInfo().taskMetrics()); + } + } + + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + int jobId = jobStart.jobId(); + int size = jobStart.stageIds().size(); + int[] intStageIds = new int[size]; + for (int i = 0; i < size; i++) { + Integer stageId = (Integer) jobStart.stageIds().apply(i); + intStageIds[i] = stageId; + stageIdToJobId.put(stageId, jobId); + } + jobIdToStageId.put(jobId, intStageIds); + } + + @Override + public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { + finishedJobIds.add(jobEnd.jobId()); + notify(); + } + } + + @Override + public SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, + Map<Integer, Integer> stageIdToJobId, + Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, + Set<Integer> finishedJobIds) { + return new JobMetricsListener(jobIdToStageId, stageIdToJobId, allJobMetrics, finishedJobIds); + } + + @Override + public void addSparkListener(SparkContext sc, SparkListener sparkListener) { + sc.addSparkListener(sparkListener); + } + +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Jul 19 01:32:41 2017 @@ -136,11 +136,12 @@ import org.apache.pig.tools.pigstats.spa import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.JobLogger; import org.apache.spark.scheduler.StatsReportListener; import com.google.common.base.Joiner; +import static org.apache.pig.backend.hadoop.executionengine.spark.SparkShims.SPARK_VERSION; + /** * Main class that launches pig for Spark */ @@ -152,7 +153,7 @@ public class SparkLauncher extends Launc // across jobs, because a // new SparkLauncher gets created for each job. private static JavaSparkContext sparkContext = null; - private static JobMetricsListener jobMetricsListener = new JobMetricsListener(); + private static JobStatisticCollector jobStatisticCollector = new JobStatisticCollector(); private String jobGroupID; private PigContext pigContext = null; private JobConf jobConf = null; @@ -174,9 +175,10 @@ public class SparkLauncher extends Launc SparkPigStats sparkStats = (SparkPigStats) pigContext .getExecutionEngine().instantiatePigStats(); sparkStats.initialize(pigContext, sparkplan, jobConf); + UDFContext.getUDFContext().addJobConf(jobConf); PigStats.start(sparkStats); - startSparkIfNeeded(pigContext); + startSparkIfNeeded(jobConf, pigContext); jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(), UUID.randomUUID().toString()); @@ -184,7 +186,7 @@ public class SparkLauncher extends Launc sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", false); - jobMetricsListener.reset(); + jobStatisticCollector.reset(); this.currentDirectoryPath = Paths.get(".").toAbsolutePath() .normalize().toString() @@ -231,7 +233,7 @@ public class SparkLauncher extends Launc } uploadResources(sparkplan); - new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit(); + new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobStatisticCollector, jobGroupID, jobConf, pigContext).visit(); cleanUpSparkJob(sparkStats); sparkStats.finish(); resetUDFContext(); @@ -539,7 +541,7 @@ public class SparkLauncher extends Launc * Only one SparkContext may be active per JVM (SPARK-2243). When multiple threads start SparkLaucher, * the static member sparkContext should be initialized only once */ - private static synchronized void startSparkIfNeeded(PigContext pc) throws PigException { + private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext pc) throws PigException { if (sparkContext == null) { String master = null; if (pc.getExecType().isLocal()) { @@ -594,9 +596,9 @@ public class SparkLauncher extends Launc checkAndConfigureDynamicAllocation(master, sparkConf); sparkContext = new JavaSparkContext(sparkConf); - sparkContext.sc().addSparkListener(new StatsReportListener()); - sparkContext.sc().addSparkListener(new JobLogger()); - sparkContext.sc().addSparkListener(jobMetricsListener); + jobConf.set(SPARK_VERSION, sparkContext.version()); + SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener()); + SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener()); } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1802347&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java Wed Jul 19 01:32:41 2017 @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.rdd.RDD; +import org.apache.spark.scheduler.SparkListener; + +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class SparkShims implements Serializable { + private static final Log LOG = LogFactory.getLog(SparkShims.class); + public static final String SPARK_VERSION = "pig.spark.version"; + + private static SparkShims sparkShims; + + private static SparkShims loadShims(String sparkVersion) throws ReflectiveOperationException { + Class<?> sparkShimsClass; + + if ("2".equals(sparkVersion)) { + LOG.info("Initializing shims for Spark 2.x"); + sparkShimsClass = Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark2Shims"); + } else { + LOG.info("Initializing shims for Spark 1.x"); + sparkShimsClass = Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark1Shims"); + } + + Constructor c = sparkShimsClass.getConstructor(); + return (SparkShims) c.newInstance(); + } + + public static SparkShims getInstance() { + if (sparkShims == null) { + String sparkVersion = UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, ""); + LOG.info("Initializing SparkShims for Spark version: " + sparkVersion); + String sparkMajorVersion = getSparkMajorVersion(sparkVersion); + try { + sparkShims = loadShims(sparkMajorVersion); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + return sparkShims; + } + + private static String getSparkMajorVersion(String sparkVersion) { + return sparkVersion.startsWith("2") ? "2" : "1"; + } + + public abstract <T, R> FlatMapFunction<T, R> flatMapFunction(FlatMapFunctionAdapter<T, R> function); + + public abstract <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(PairFlatMapFunctionAdapter<T, K, V> function); + + public abstract RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean shuffle); + + public abstract SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf); + + public abstract SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf); + + public abstract <T> OptionalWrapper<T> wrapOptional(T tuple); + + public abstract SparkListener getJobMetricsListener(Map<Integer, int[]> jobIdToStageId, + Map<Integer, Integer> stageIdToJobId, + Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics, + Set<Integer> finishedJobIds); + + public abstract void addSparkListener(SparkContext sc, SparkListener sparkListener); + + public interface OptionalWrapper<T> { + boolean isPresent(); + + T get(); + } +} \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java Wed Jul 19 01:32:41 2017 @@ -24,9 +24,10 @@ import java.util.List; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @SuppressWarnings({"serial"}) @@ -39,48 +40,40 @@ public class CollectedGroupConverter imp RDD<Tuple> rdd = predecessors.get(0); CollectedGroupFunction collectedGroupFunction = new CollectedGroupFunction(physicalOperator); - return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true) + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(collectedGroupFunction), true) .rdd(); } private static class CollectedGroupFunction - implements FlatMapFunction<Iterator<Tuple>, Tuple> { + implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> { private POCollectedGroup poCollectedGroup; public long current_val; - public boolean proceed; private CollectedGroupFunction(POCollectedGroup poCollectedGroup) { this.poCollectedGroup = poCollectedGroup; this.current_val = 0; } - public Iterable<Tuple> call(final Iterator<Tuple> input) { - - return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> call(final Iterator<Tuple> input) { + return new OutputConsumerIterator(input) { @Override - public Iterator<Tuple> iterator() { + protected void attach(Tuple tuple) { + poCollectedGroup.setInputs(null); + poCollectedGroup.attachInput(tuple); + } - return new OutputConsumerIterator(input) { + @Override + protected Result getNextResult() throws ExecException { + return poCollectedGroup.getNextTuple(); + } - @Override - protected void attach(Tuple tuple) { - poCollectedGroup.setInputs(null); - poCollectedGroup.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poCollectedGroup.getNextTuple(); - } - - @Override - protected void endOfInput() { - poCollectedGroup.setEndOfInput(true); - } - }; + @Override + protected void endOfInput() { + poCollectedGroup.setEndOfInput(true); } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Wed Jul 19 01:32:41 2017 @@ -32,10 +32,11 @@ import org.apache.commons.logging.LogFac import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @SuppressWarnings("serial") @@ -53,7 +54,7 @@ public class FRJoinConverter implements attachReplicatedInputs((POFRJoinSpark) poFRJoin); FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin); - return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(frJoinFunction), true).rdd(); } private void attachReplicatedInputs(POFRJoinSpark poFRJoin) { @@ -67,7 +68,7 @@ public class FRJoinConverter implements } private static class FRJoinFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable { private POFRJoin poFRJoin; private FRJoinFunction(POFRJoin poFRJoin) { @@ -75,29 +76,22 @@ public class FRJoinConverter implements } @Override - public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception { + public Iterator<Tuple> call(final Iterator<Tuple> input) { + return new OutputConsumerIterator(input) { - return new Iterable<Tuple>() { + @Override + protected void attach(Tuple tuple) { + poFRJoin.setInputs(null); + poFRJoin.attachInput(tuple); + } @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { + protected Result getNextResult() throws ExecException { + return poFRJoin.getNextTuple(); + } - @Override - protected void attach(Tuple tuple) { - poFRJoin.setInputs(null); - poFRJoin.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poFRJoin.getNextTuple(); - } - - @Override - protected void endOfInput() { - } - }; + @Override + protected void endOfInput() { } }; } @@ -107,4 +101,4 @@ public class FRJoinConverter implements public void setReplicatedInputs(Set<String> replicatedInputs) { this.replicatedInputs = replicatedInputs; } -} \ No newline at end of file +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Wed Jul 19 01:32:41 2017 @@ -29,13 +29,14 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.util.ObjectSerializer; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; /** @@ -60,11 +61,11 @@ public class ForEachConverter implements RDD<Tuple> rdd = predecessors.get(0); ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes); - return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(forEachFunction), true).rdd(); } private static class ForEachFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable { private POForEach poForEach; private byte[] confBytes; @@ -75,7 +76,8 @@ public class ForEachConverter implements this.confBytes = confBytes; } - public Iterable<Tuple> call(final Iterator<Tuple> input) { + @Override + public Iterator<Tuple> call(final Iterator<Tuple> input) { initialize(); @@ -90,29 +92,21 @@ public class ForEachConverter implements } } } + return new OutputConsumerIterator(input) { - - return new Iterable<Tuple>() { + @Override + protected void attach(Tuple tuple) { + poForEach.setInputs(null); + poForEach.attachInput(tuple); + } @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { + protected Result getNextResult() throws ExecException { + return poForEach.getNextTuple(); + } - @Override - protected void attach(Tuple tuple) { - poForEach.setInputs(null); - poForEach.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poForEach.getNextTuple(); - } - - @Override - protected void endOfInput() { - } - }; + @Override + protected void endOfInput() { } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Wed Jul 19 01:32:41 2017 @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; @@ -34,7 +35,6 @@ import org.apache.pig.data.TupleFactory; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.CoGroupedRDD; import org.apache.spark.rdd.RDD; @@ -127,7 +127,7 @@ public class GlobalRearrangeConverter im } private static class RemoveValueFunction implements - FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable { private class Tuple2TransformIterable implements Iterable<Tuple> { @@ -148,8 +148,8 @@ public class GlobalRearrangeConverter im } @Override - public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { - return new Tuple2TransformIterable(input); + public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) { + return new Tuple2TransformIterable(input).iterator(); } } @@ -330,7 +330,7 @@ public class GlobalRearrangeConverter im } } }); - ++ i; + ++i; } Tuple out = tf.newTuple(2); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Wed Jul 19 01:32:41 2017 @@ -24,9 +24,10 @@ import java.util.List; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @SuppressWarnings({ "serial" }) @@ -38,11 +39,11 @@ public class LimitConverter implements R SparkUtil.assertPredecessorSize(predecessors, poLimit, 1); RDD<Tuple> rdd = predecessors.get(0); LimitFunction limitFunction = new LimitFunction(poLimit); - RDD<Tuple> rdd2 = rdd.coalesce(1, false, null); - return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd(); + RDD<Tuple> rdd2 = SparkShims.getInstance().coalesce(rdd, 1, false); + return rdd2.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(limitFunction), false).rdd(); } - private static class LimitFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> { + private static class LimitFunction implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> { private final POLimit poLimit; @@ -51,28 +52,22 @@ public class LimitConverter implements R } @Override - public Iterable<Tuple> call(final Iterator<Tuple> tuples) { + public Iterator<Tuple> call(final Iterator<Tuple> tuples) { + return new OutputConsumerIterator(tuples) { - return new Iterable<Tuple>() { + @Override + protected void attach(Tuple tuple) { + poLimit.setInputs(null); + poLimit.attachInput(tuple); + } - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(tuples) { + @Override + protected Result getNextResult() throws ExecException { + return poLimit.getNextTuple(); + } - @Override - protected void attach(Tuple tuple) { - poLimit.setInputs(null); - poLimit.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poLimit.getNextTuple(); - } - - @Override - protected void endOfInput() { - } - }; + @Override + protected void endOfInput() { } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java Wed Jul 19 01:32:41 2017 @@ -24,9 +24,10 @@ import java.util.List; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @@ -37,38 +38,32 @@ public class MergeCogroupConverter imple SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); RDD<Tuple> rdd = predecessors.get(0); MergeCogroupFunction mergeCogroupFunction = new MergeCogroupFunction(physicalOperator); - return rdd.toJavaRDD().mapPartitions(mergeCogroupFunction, true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeCogroupFunction), true).rdd(); } private static class MergeCogroupFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable { private POMergeCogroup poMergeCogroup; @Override - public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception { - return new Iterable<Tuple>() { + public Iterator<Tuple> call(final Iterator<Tuple> input) { + return new OutputConsumerIterator(input) { @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { + protected void attach(Tuple tuple) { + poMergeCogroup.setInputs(null); + poMergeCogroup.attachInput(tuple); + } - @Override - protected void attach(Tuple tuple) { - poMergeCogroup.setInputs(null); - poMergeCogroup.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poMergeCogroup.getNextTuple(); - } - - @Override - protected void endOfInput() { - poMergeCogroup.setEndOfInput(true); - } - }; + @Override + protected Result getNextResult() throws ExecException { + return poMergeCogroup.getNextTuple(); + } + + @Override + protected void endOfInput() { + poMergeCogroup.setEndOfInput(true); } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Wed Jul 19 01:32:41 2017 @@ -25,9 +25,10 @@ import java.util.List; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @@ -43,11 +44,11 @@ public class MergeJoinConverter implemen RDD<Tuple> rdd = predecessors.get(0); MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin); - return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeJoinFunction), true).rdd(); } private static class MergeJoinFunction implements - FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable { private POMergeJoin poMergeJoin; @@ -55,29 +56,24 @@ public class MergeJoinConverter implemen this.poMergeJoin = poMergeJoin; } - public Iterable<Tuple> call(final Iterator<Tuple> input) { + @Override + public Iterator<Tuple> call(final Iterator<Tuple> input) { + return new OutputConsumerIterator(input) { - return new Iterable<Tuple>() { @Override - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(input) { + protected void attach(Tuple tuple) { + poMergeJoin.setInputs(null); + poMergeJoin.attachInput(tuple); + } - @Override - protected void attach(Tuple tuple) { - poMergeJoin.setInputs(null); - poMergeJoin.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return poMergeJoin.getNextTuple(); - } - - @Override - protected void endOfInput() { - poMergeJoin.setEndOfInput(true); - } - }; + @Override + protected Result getNextResult() throws ExecException { + return poMergeJoin.getNextTuple(); + } + + @Override + protected void endOfInput() { + poMergeJoin.setEndOfInput(true); } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java Wed Jul 19 01:32:41 2017 @@ -19,10 +19,11 @@ package org.apache.pig.backend.hadoop.ex import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; import org.apache.pig.data.Tuple; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; import java.io.IOException; @@ -37,10 +38,10 @@ public class PoissonSampleConverter impl SparkUtil.assertPredecessorSize(predecessors, po, 1); RDD<Tuple> rdd = predecessors.get(0); PoissionSampleFunction poissionSampleFunction = new PoissionSampleFunction(po); - return rdd.toJavaRDD().mapPartitions(poissionSampleFunction, false).rdd(); + return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(poissionSampleFunction), false).rdd(); } - private static class PoissionSampleFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> { + private static class PoissionSampleFunction implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> { private final POPoissonSampleSpark po; @@ -49,29 +50,23 @@ public class PoissonSampleConverter impl } @Override - public Iterable<Tuple> call(final Iterator<Tuple> tuples) { + public Iterator<Tuple> call(final Iterator<Tuple> tuples) { + return new OutputConsumerIterator(tuples) { - return new Iterable<Tuple>() { + @Override + protected void attach(Tuple tuple) { + po.setInputs(null); + po.attachInput(tuple); + } - public Iterator<Tuple> iterator() { - return new OutputConsumerIterator(tuples) { + @Override + protected Result getNextResult() throws ExecException { + return po.getNextTuple(); + } - @Override - protected void attach(Tuple tuple) { - po.setInputs(null); - po.attachInput(tuple); - } - - @Override - protected Result getNextResult() throws ExecException { - return po.getNextTuple(); - } - - @Override - protected void endOfInput() { - po.setEndOfInput(true); - } - }; + @Override + protected void endOfInput() { + po.setEndOfInput(true); } }; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1802347&r1=1802346&r2=1802347&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Wed Jul 19 01:32:41 2017 @@ -22,6 +22,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Objects; +import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import scala.Tuple2; import org.apache.commons.logging.Log; @@ -30,13 +33,11 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; /** @@ -56,13 +57,13 @@ public class SecondaryKeySortUtil { JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions( new IndexedKeyPartitioner(partitionNums)); //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) - return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd(); + return sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(new AccumulateByKey(pkgOp)), true).rdd(); } //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...)) //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result - private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>, - Serializable { + private static class AccumulateByKey + implements FlatMapFunctionAdapter<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>, Serializable { private POPackage pkgOp; public AccumulateByKey(POPackage pkgOp) { @@ -70,7 +71,7 @@ public class SecondaryKeySortUtil { } @Override - public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception { + public Iterator<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) { return new Iterable<Tuple>() { Object curKey = null; ArrayList curValues = new ArrayList(); @@ -132,7 +133,7 @@ public class SecondaryKeySortUtil { } }; } - }; + }.iterator(); } private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {