Author: zly
Date: Wed Jul 19 01:32:41 2017
New Revision: 1802347
URL: http://svn.apache.org/viewvc?rev=1802347&view=rev
Log:
PIG-5157:Upgrade to Spark 2.0 (nkollar via liyunzhang)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java
Removed:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
Modified:
pig/trunk/build.xml
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/trunk/build.xml
URL:
http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Wed Jul 19 01:32:41 2017
@@ -207,8 +207,7 @@
<property name="ivy.repo.dir" value="${user.home}/ivyrepo" />
<property name="ivy.dir" location="ivy" />
<property name="loglevel" value="quiet" />
- <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
-
+ <loadproperties srcfile="${ivy.dir}/libraries.properties" />
<!--
Hadoop master version
@@ -241,6 +240,11 @@
</then>
</if>
<property name="hbaseversion" value="1" />
+ <property name="sparkversion" value="1" />
+
+ <condition property="src.exclude.dir" value="**/Spark2*.java"
else="**/Spark1*.java">
+ <equals arg1="${sparkversion}" arg2="1"/>
+ </condition>
<property name="src.shims.dir"
value="${basedir}/shims/src/hadoop${hadoopversion}" />
<property name="src.shims.test.dir"
value="${basedir}/shims/test/hadoop${hadoopversion}" />
@@ -556,7 +560,7 @@
<echo>*** Building Main Sources ***</echo>
<echo>*** To compile with all warnings enabled, supply
-Dall.warnings=1 on command line ***</echo>
<echo>*** Else, you will only be warned about deprecations ***</echo>
- <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used:
${hbaseversion} ***</echo>
+ <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used:
${hbaseversion} ; Spark version used: ${sparkversion} ***</echo>
<compileSources
sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir}"
excludes="${src.exclude.dir}" dist="${build.classes}"
cp="classpath" warnings="${javac.args.warnings}" />
<copy todir="${build.classes}/META-INF">
@@ -688,7 +692,7 @@
<!-- ================================================================== -->
<!-- Make pig.jar -->
<!-- ================================================================== -->
- <target name="jar" depends="compile,ivy-buildJar" description="Create pig
core jar">
+ <target name="jar-simple" depends="compile,ivy-buildJar"
description="Create pig core jar">
<buildJar svnString="${svn.revision}"
outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
<buildJar svnString="${svn.revision}"
outputFile="${output.jarfile.withouthadoop}"
includedJars="runtime.dependencies-withouthadoop.jar"/>
<antcall target="copyCommonDependencies"/>
@@ -788,6 +792,35 @@
</sequential>
</macrodef>
+ <target name="jar-core" depends="compile,ivy-buildJar" description="Create
only pig core jar">
+ <buildJar svnString="${svn.revision}"
outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
+ </target>
+
+ <target name="jar" description="Create pig jar with Spark 1 and 2">
+ <echo>Compiling against Spark 2</echo>
+ <antcall target="clean" inheritRefs="true" inheritall="true"/>
+ <propertyreset name="sparkversion" value="2"/>
+ <propertyreset name="src.exclude.dir" value="**/Spark1*.java" />
+ <antcall target="jar-core" inheritRefs="true" inheritall="true"/>
+ <move file="${output.jarfile.core}"
tofile="${basedir}/_pig-shims.jar"/>
+
+ <echo>Compiling against Spark 1</echo>
+ <antcall target="clean" inheritRefs="true" inheritall="true"/>
+ <propertyreset name="sparkversion" value="1"/>
+ <propertyreset name="src.exclude.dir" value="**/Spark2*.java" />
+ <antcall target="jar-simple" inheritRefs="true" inheritall="true"/>
+ <jar update="yes" jarfile="${output.jarfile.core}">
+ <zipfileset src="${basedir}/_pig-shims.jar"
includes="**/Spark2*.class"/>
+ </jar>
+ <jar update="yes" jarfile="${output.jarfile.backcompat-core-h2}">
+ <zipfileset src="${basedir}/_pig-shims.jar"
includes="**/Spark2*.class"/>
+ </jar>
+ <jar update="yes" jarfile="${output.jarfile.withouthadoop-h2}">
+ <zipfileset src="${basedir}/_pig-shims.jar"
includes="**/Spark2*.class"/>
+ </jar>
+ <delete file="${basedir}/_pig-shims.jar"/>
+ </target>
+
<!-- ================================================================== -->
<!-- macrodef: buildJar -->
<!-- ================================================================== -->
@@ -1655,7 +1688,7 @@
<target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved"
description="Resolve Ivy dependencies">
<property name="ivy.resolved" value="true"/>
- <echo>*** Ivy resolve with Hadoop ${hadoopversion} and HBase
${hbaseversion} ***</echo>
+ <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark
${sparkversion} and HBase ${hbaseversion} ***</echo>
<ivy:resolve log="${loglevel}"
settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
<ivy:report toDir="build/ivy/report"/>
</target>
@@ -1664,7 +1697,7 @@
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings"
log="${loglevel}"
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings"
log="${loglevel}"
-
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]"
conf="spark"/>
+
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]"
conf="spark${sparkversion}"/>
<ivy:cachepath pathid="compile.classpath" conf="compile"/>
</target>
Modified: pig/trunk/ivy.xml
URL:
http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Wed Jul 19 01:32:41 2017
@@ -40,7 +40,8 @@
<conf name="buildJar" extends="compile,test" visibility="private"/>
<conf name="hadoop2" visibility="private"/>
<conf name="hbase1" visibility="private"/>
- <conf name="spark" visibility="private" />
+ <conf name="spark1" visibility="private" />
+ <conf name="spark2" visibility="private" />
</configurations>
<publications>
<artifact name="pig" conf="master"/>
@@ -407,8 +408,8 @@
<dependency org="com.twitter" name="parquet-pig-bundle"
rev="${parquet-pig-bundle.version}" conf="compile->master"/>
- <!-- for Spark integration -->
- <dependency org="org.apache.spark" name="spark-core_2.11"
rev="${spark.version}" conf="spark->default">
+ <!-- for Spark 1.x integration -->
+ <dependency org="org.apache.spark" name="spark-core_2.11"
rev="${spark1.version}" conf="spark1->default">
<exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
<exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
<exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
@@ -418,12 +419,28 @@
<exclude org="jline" module="jline"/>
<exclude org="com.google.guava" />
</dependency>
- <dependency org="org.apache.spark" name="spark-yarn_2.11"
rev="${spark.version}" conf="spark->default">
+ <dependency org="org.apache.spark" name="spark-yarn_2.11"
rev="${spark1.version}" conf="spark1->default">
<exclude org="org.apache.hadoop" />
</dependency>
+
+ <!-- for Spark 2.x integration -->
+ <dependency org="org.apache.spark" name="spark-core_2.11"
rev="${spark2.version}" conf="spark2->default">
+ <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
+ <exclude org="org.apache.hadoop" />
+ <exclude org="com.esotericsoftware.kryo" />
+ <exclude org="jline" module="jline"/>
+ <exclude org="com.google.guava" />
+ </dependency>
+ <dependency org="org.apache.spark" name="spark-yarn_2.11"
rev="${spark2.version}" conf="spark2->default">
+ <exclude org="org.apache.hadoop" />
+ </dependency>
+
<dependency org="asm" name="asm" rev="${asm.version}"
conf="compile->master"/>
- <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1"
conf="spark->default"/>
- <dependency org="org.scala-lang.modules" name="scala-xml_2.11"
rev="${scala-xml.version}" conf="spark->default"/>
+ <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1"
conf="spark1->default;spark2->default"/>
+ <dependency org="org.scala-lang.modules" name="scala-xml_2.11"
rev="${scala-xml.version}" conf="spark1->default;spark2->default"/>
<!-- for Tez integration -->
<dependency org="org.apache.tez" name="tez" rev="${tez.version}"
Modified: pig/trunk/ivy/libraries.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Wed Jul 19 01:32:41 2017
@@ -73,7 +73,8 @@ netty-all.version=4.0.23.Final
rats-lib.version=0.5.1
slf4j-api.version=1.6.1
slf4j-log4j12.version=1.6.1
-spark.version=1.6.1
+spark1.version=1.6.1
+spark2.version=2.1.1
xerces.version=2.10.0
xalan.version=2.7.1
wagon-http.version=1.0-beta-2
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/FlatMapFunctionAdapter.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public interface FlatMapFunctionAdapter<R, T> extends Serializable {
+ Iterator<T> call(final R r) throws Exception;
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Wed Jul 19 01:32:41 2017
@@ -81,7 +81,7 @@ public class JobGraphBuilder extends Spa
private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap =
null;
private SparkPigStats sparkStats = null;
private JavaSparkContext sparkContext = null;
- private JobMetricsListener jobMetricsListener = null;
+ private JobStatisticCollector jobStatisticCollector = null;
private String jobGroupID = null;
private Set<Integer> seenJobIDs = new HashSet<Integer>();
private SparkOperPlan sparkPlan = null;
@@ -91,14 +91,14 @@ public class JobGraphBuilder extends Spa
private PigContext pc;
public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends
PhysicalOperator>, RDDConverter> convertMap,
- SparkPigStats sparkStats, JavaSparkContext
sparkContext, JobMetricsListener
- jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext
pc) {
+ SparkPigStats sparkStats, JavaSparkContext
sparkContext, JobStatisticCollector
+ jobStatisticCollector, String jobGroupID, JobConf jobConf,
PigContext pc) {
super(plan, new DependencyOrderWalker<SparkOperator,
SparkOperPlan>(plan, true));
this.sparkPlan = plan;
this.convertMap = convertMap;
this.sparkStats = sparkStats;
this.sparkContext = sparkContext;
- this.jobMetricsListener = jobMetricsListener;
+ this.jobStatisticCollector = jobStatisticCollector;
this.jobGroupID = jobGroupID;
this.jobConf = jobConf;
this.pc = pc;
@@ -223,7 +223,7 @@ public class JobGraphBuilder extends Spa
}
}
SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++),
poStore, sparkOperator,
- jobMetricsListener, sparkContext, sparkStats);
+ jobStatisticCollector, sparkContext,
sparkStats);
}
} else {
for (POStore poStore : poStores) {
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobStatisticCollector.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JobStatisticCollector {
+
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>>
allJobStatistics = Maps.newHashMap();
+ private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+ private SparkListener sparkListener;
+
+ public SparkListener getSparkListener() {
+ if (sparkListener == null) {
+ sparkListener = SparkShims.getInstance()
+ .getJobMetricsListener(jobIdToStageId, stageIdToJobId,
allJobStatistics, finishedJobIds);
+ }
+ return sparkListener;
+ }
+
+ public Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+ synchronized (sparkListener) {
+ return allJobStatistics.get(jobId);
+ }
+ }
+
+ public boolean waitForJobToEnd(int jobId) throws InterruptedException {
+ synchronized (sparkListener) {
+ if (finishedJobIds.contains(jobId)) {
+ finishedJobIds.remove(jobId);
+ return true;
+ }
+
+ sparkListener.wait();
+ return false;
+ }
+ }
+
+ public void cleanup(int jobId) {
+ synchronized (sparkListener) {
+ allJobStatistics.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator =
stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ public void reset() {
+ synchronized (sparkListener) {
+ stageIdToJobId.clear();
+ jobIdToStageId.clear();
+ allJobStatistics.clear();
+ finishedJobIds.clear();
+ }
+ }
+}
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/PairFlatMapFunctionAdapter.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public interface PairFlatMapFunctionAdapter<T, K, V> extends Serializable {
+ Iterator<Tuple2<K, V>> call(T t) throws Exception;
+}
\ No newline at end of file
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark1Shims.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
+import org.apache.pig.tools.pigstats.spark.Spark1JobStats;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerBlockUpdated;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Spark1Shims extends SparkShims {
+ @Override
+ public <T, R> FlatMapFunction<T, R> flatMapFunction(final
FlatMapFunctionAdapter<T, R> function) {
+ return new FlatMapFunction<T, R>() {
+ @Override
+ public Iterable<R> call(final T t) throws Exception {
+ return new Iterable<R>() {
+ @Override
+ public Iterator<R> iterator() {
+ try {
+ return function.call(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ }
+ };
+ }
+
+ @Override
+ public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final
PairFlatMapFunctionAdapter<T, K, V> function) {
+ return new PairFlatMapFunction<T, K, V>() {
+ @Override
+ public Iterable<Tuple2<K, V>> call(final T t) throws Exception {
+ return new Iterable<Tuple2<K, V>>() {
+ @Override
+ public Iterator<Tuple2<K, V>> iterator() {
+ try {
+ return function.call(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ }
+ };
+ }
+
+ @Override
+ public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean
shuffle) {
+ return rdd.coalesce(numPartitions, shuffle, null);
+ }
+
+ @Override
+ public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan,
Configuration conf) {
+ return new Spark1JobStats(jobId, plan, conf);
+ }
+
+ @Override
+ public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan,
Configuration conf) {
+ return new Spark1JobStats(jobId, plan, conf);
+ }
+
+ @Override
+ public <T> OptionalWrapper<T> wrapOptional(T tuple) {
+ final Optional<T> t = (Optional<T>) tuple;
+
+ return new OptionalWrapper<T>() {
+ @Override
+ public boolean isPresent() {
+ return t.isPresent();
+ }
+
+ @Override
+ public T get() {
+ return t.get();
+ }
+ };
+ }
+
+ private static class JobMetricsListener implements SparkListener {
+ private final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+ private Map<Integer, int[]> jobIdToStageId;
+ private Map<Integer, Integer> stageIdToJobId;
+ private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics;
+ private Set<Integer> finishedJobIds;
+
+ JobMetricsListener(final Map<Integer, int[]> jobIdToStageId,
+ final Map<Integer, Integer> stageIdToJobId,
+ final Map<Integer, Map<String, List<TaskMetrics>>>
allJobMetrics,
+ final Set<Integer> finishedJobIds) {
+ this.jobIdToStageId = jobIdToStageId;
+ this.stageIdToJobId = stageIdToJobId;
+ this.allJobMetrics = allJobMetrics;
+ this.finishedJobIds = finishedJobIds;
+ }
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted
stageCompleted) {
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted
stageSubmitted) {
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult
taskGettingResult) {
+ }
+
+ @Override
+ public void onExecutorRemoved(SparkListenerExecutorRemoved
executorRemoved) {
+ }
+
+ @Override
+ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+ }
+
+ @Override
+ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
+ }
+
+ @Override
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Cannot find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics =
allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics =
jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+ finishedJobIds.add(jobEnd.jobId());
+ notify();
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate
environmentUpdate) {
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded
blockManagerAdded) {
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved
blockManagerRemoved) {
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart
applicationStart) {
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd
applicationEnd) {
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate
executorMetricsUpdate) {
+ }
+ }
+
+ @Override
+ public SparkListener getJobMetricsListener(Map<Integer, int[]>
jobIdToStageId,
+ Map<Integer,
Integer> stageIdToJobId,
+ Map<Integer,
Map<String, List<TaskMetrics>>> allJobMetrics,
+ Set<Integer>
finishedJobIds) {
+ return new JobMetricsListener(jobIdToStageId, stageIdToJobId,
allJobMetrics, finishedJobIds);
+ }
+
+ @Override
+ public void addSparkListener(SparkContext sc, SparkListener sparkListener)
{
+ sc.addSparkListener(sparkListener);
+ }
+}
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/Spark2Shims.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.Spark2JobStats;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.rdd.PartitionCoalescer;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import scala.Option;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Spark2Shims extends SparkShims {
+ @Override
+ public <T, R> FlatMapFunction flatMapFunction(final
FlatMapFunctionAdapter<T, R> function) {
+ return new FlatMapFunction<T, R>() {
+ @Override
+ public Iterator<R> call(T t) throws Exception {
+ return function.call(t);
+ }
+ };
+ }
+
+ @Override
+ public <T, K, V> PairFlatMapFunction<T, K, V> pairFlatMapFunction(final
PairFlatMapFunctionAdapter<T, K, V> function) {
+ return new PairFlatMapFunction<T, K, V>() {
+ @Override
+ public Iterator<Tuple2<K, V>> call(T t) throws Exception {
+ return function.call(t);
+ }
+ };
+ }
+
+ @Override
+ public RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions, boolean
shuffle) {
+ return rdd.coalesce(numPartitions, shuffle,
Option.<PartitionCoalescer>empty(), null);
+ }
+
+ @Override
+ public SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph plan,
Configuration conf) {
+ return new Spark2JobStats(jobId, plan, conf);
+ }
+
+ @Override
+ public SparkJobStats sparkJobStats(String jobId, PigStats.JobGraph plan,
Configuration conf) {
+ return new Spark2JobStats(jobId, plan, conf);
+ }
+
+ @Override
+ public <T> OptionalWrapper<T> wrapOptional(T tuple) {
+ final Optional<T> t = (Optional<T>) tuple;
+
+ return new OptionalWrapper<T>() {
+ @Override
+ public boolean isPresent() {
+ return t.isPresent();
+ }
+
+ @Override
+ public T get() {
+ return t.get();
+ }
+ };
+ }
+
+ private static class JobMetricsListener extends SparkListener {
+ private final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+ private Map<Integer, int[]> jobIdToStageId;
+ private Map<Integer, Integer> stageIdToJobId;
+ private Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics;
+ private Set<Integer> finishedJobIds;
+
+ JobMetricsListener(final Map<Integer, int[]> jobIdToStageId,
+ final Map<Integer, Integer> stageIdToJobId,
+ final Map<Integer, Map<String, List<TaskMetrics>>>
allJobMetrics,
+ final Set<Integer> finishedJobIds) {
+ this.jobIdToStageId = jobIdToStageId;
+ this.stageIdToJobId = stageIdToJobId;
+ this.allJobMetrics = allJobMetrics;
+ this.finishedJobIds = finishedJobIds;
+ }
+
+ @Override
+ public synchronized void onStageCompleted(SparkListenerStageCompleted
stageCompleted) {
+ int stageId = stageCompleted.stageInfo().stageId();
+ int stageAttemptId = stageCompleted.stageInfo().attemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Cannot find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics =
allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics =
jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(stageCompleted.stageInfo().taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+ finishedJobIds.add(jobEnd.jobId());
+ notify();
+ }
+ }
+
+ @Override
+ public SparkListener getJobMetricsListener(Map<Integer, int[]>
jobIdToStageId,
+ Map<Integer, Integer>
stageIdToJobId,
+ Map<Integer, Map<String,
List<TaskMetrics>>> allJobMetrics,
+ Set<Integer> finishedJobIds) {
+ return new JobMetricsListener(jobIdToStageId, stageIdToJobId,
allJobMetrics, finishedJobIds);
+ }
+
+ @Override
+ public void addSparkListener(SparkContext sc, SparkListener sparkListener)
{
+ sc.addSparkListener(sparkListener);
+ }
+
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Jul 19 01:32:41 2017
@@ -136,11 +136,12 @@ import org.apache.pig.tools.pigstats.spa
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
import com.google.common.base.Joiner;
+import static
org.apache.pig.backend.hadoop.executionengine.spark.SparkShims.SPARK_VERSION;
+
/**
* Main class that launches pig for Spark
*/
@@ -152,7 +153,7 @@ public class SparkLauncher extends Launc
// across jobs, because a
// new SparkLauncher gets created for each job.
private static JavaSparkContext sparkContext = null;
- private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
+ private static JobStatisticCollector jobStatisticCollector = new
JobStatisticCollector();
private String jobGroupID;
private PigContext pigContext = null;
private JobConf jobConf = null;
@@ -174,9 +175,10 @@ public class SparkLauncher extends Launc
SparkPigStats sparkStats = (SparkPigStats) pigContext
.getExecutionEngine().instantiatePigStats();
sparkStats.initialize(pigContext, sparkplan, jobConf);
+ UDFContext.getUDFContext().addJobConf(jobConf);
PigStats.start(sparkStats);
- startSparkIfNeeded(pigContext);
+ startSparkIfNeeded(jobConf, pigContext);
jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(),
UUID.randomUUID().toString());
@@ -184,7 +186,7 @@ public class SparkLauncher extends Launc
sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
false);
- jobMetricsListener.reset();
+ jobStatisticCollector.reset();
this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
.normalize().toString()
@@ -231,7 +233,7 @@ public class SparkLauncher extends Launc
}
uploadResources(sparkplan);
- new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext,
jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
+ new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext,
jobStatisticCollector, jobGroupID, jobConf, pigContext).visit();
cleanUpSparkJob(sparkStats);
sparkStats.finish();
resetUDFContext();
@@ -539,7 +541,7 @@ public class SparkLauncher extends Launc
* Only one SparkContext may be active per JVM (SPARK-2243). When multiple
threads start SparkLaucher,
* the static member sparkContext should be initialized only once
*/
- private static synchronized void startSparkIfNeeded(PigContext pc) throws
PigException {
+ private static synchronized void startSparkIfNeeded(JobConf jobConf,
PigContext pc) throws PigException {
if (sparkContext == null) {
String master = null;
if (pc.getExecType().isLocal()) {
@@ -594,9 +596,9 @@ public class SparkLauncher extends Launc
checkAndConfigureDynamicAllocation(master, sparkConf);
sparkContext = new JavaSparkContext(sparkConf);
- sparkContext.sc().addSparkListener(new StatsReportListener());
- sparkContext.sc().addSparkListener(new JobLogger());
- sparkContext.sc().addSparkListener(jobMetricsListener);
+ jobConf.set(SPARK_VERSION, sparkContext.version());
+ SparkShims.getInstance().addSparkListener(sparkContext.sc(),
jobStatisticCollector.getSparkListener());
+ SparkShims.getInstance().addSparkListener(sparkContext.sc(), new
StatsReportListener());
}
}
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1802347&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
Wed Jul 19 01:32:41 2017
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.scheduler.SparkListener;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class SparkShims implements Serializable {
+ private static final Log LOG = LogFactory.getLog(SparkShims.class);
+ public static final String SPARK_VERSION = "pig.spark.version";
+
+ private static SparkShims sparkShims;
+
+ private static SparkShims loadShims(String sparkVersion) throws
ReflectiveOperationException {
+ Class<?> sparkShimsClass;
+
+ if ("2".equals(sparkVersion)) {
+ LOG.info("Initializing shims for Spark 2.x");
+ sparkShimsClass =
Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark2Shims");
+ } else {
+ LOG.info("Initializing shims for Spark 1.x");
+ sparkShimsClass =
Class.forName("org.apache.pig.backend.hadoop.executionengine.spark.Spark1Shims");
+ }
+
+ Constructor c = sparkShimsClass.getConstructor();
+ return (SparkShims) c.newInstance();
+ }
+
+ public static SparkShims getInstance() {
+ if (sparkShims == null) {
+ String sparkVersion =
UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, "");
+ LOG.info("Initializing SparkShims for Spark version: " +
sparkVersion);
+ String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
+ try {
+ sparkShims = loadShims(sparkMajorVersion);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return sparkShims;
+ }
+
+ private static String getSparkMajorVersion(String sparkVersion) {
+ return sparkVersion.startsWith("2") ? "2" : "1";
+ }
+
+ public abstract <T, R> FlatMapFunction<T, R>
flatMapFunction(FlatMapFunctionAdapter<T, R> function);
+
+ public abstract <T, K, V> PairFlatMapFunction<T, K, V>
pairFlatMapFunction(PairFlatMapFunctionAdapter<T, K, V> function);
+
+ public abstract RDD<Tuple> coalesce(RDD<Tuple> rdd, int numPartitions,
boolean shuffle);
+
+ public abstract SparkJobStats sparkJobStats(int jobId, PigStats.JobGraph
plan, Configuration conf);
+
+ public abstract SparkJobStats sparkJobStats(String jobId,
PigStats.JobGraph plan, Configuration conf);
+
+ public abstract <T> OptionalWrapper<T> wrapOptional(T tuple);
+
+ public abstract SparkListener getJobMetricsListener(Map<Integer, int[]>
jobIdToStageId,
+ Map<Integer, Integer>
stageIdToJobId,
+ Map<Integer,
Map<String, List<TaskMetrics>>> allJobMetrics,
+ Set<Integer>
finishedJobIds);
+
+ public abstract void addSparkListener(SparkContext sc, SparkListener
sparkListener);
+
+ public interface OptionalWrapper<T> {
+ boolean isPresent();
+
+ T get();
+ }
+}
\ No newline at end of file
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
Wed Jul 19 01:32:41 2017
@@ -24,9 +24,10 @@ import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@SuppressWarnings({"serial"})
@@ -39,48 +40,40 @@ public class CollectedGroupConverter imp
RDD<Tuple> rdd = predecessors.get(0);
CollectedGroupFunction collectedGroupFunction
= new CollectedGroupFunction(physicalOperator);
- return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(collectedGroupFunction),
true)
.rdd();
}
private static class CollectedGroupFunction
- implements FlatMapFunction<Iterator<Tuple>, Tuple> {
+ implements FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> {
private POCollectedGroup poCollectedGroup;
public long current_val;
- public boolean proceed;
private CollectedGroupFunction(POCollectedGroup poCollectedGroup) {
this.poCollectedGroup = poCollectedGroup;
this.current_val = 0;
}
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
-
- return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
+ return new OutputConsumerIterator(input) {
@Override
- public Iterator<Tuple> iterator() {
+ protected void attach(Tuple tuple) {
+ poCollectedGroup.setInputs(null);
+ poCollectedGroup.attachInput(tuple);
+ }
- return new OutputConsumerIterator(input) {
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poCollectedGroup.getNextTuple();
+ }
- @Override
- protected void attach(Tuple tuple) {
- poCollectedGroup.setInputs(null);
- poCollectedGroup.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poCollectedGroup.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- poCollectedGroup.setEndOfInput(true);
- }
- };
+ @Override
+ protected void endOfInput() {
+ poCollectedGroup.setEndOfInput(true);
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
Wed Jul 19 01:32:41 2017
@@ -32,10 +32,11 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@SuppressWarnings("serial")
@@ -53,7 +54,7 @@ public class FRJoinConverter implements
attachReplicatedInputs((POFRJoinSpark) poFRJoin);
FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin);
- return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd();
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(frJoinFunction),
true).rdd();
}
private void attachReplicatedInputs(POFRJoinSpark poFRJoin) {
@@ -67,7 +68,7 @@ public class FRJoinConverter implements
}
private static class FRJoinFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
private POFRJoin poFRJoin;
private FRJoinFunction(POFRJoin poFRJoin) {
@@ -75,29 +76,22 @@ public class FRJoinConverter implements
}
@Override
- public Iterable<Tuple> call(final Iterator<Tuple> input) throws
Exception {
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
+ return new OutputConsumerIterator(input) {
- return new Iterable<Tuple>() {
+ @Override
+ protected void attach(Tuple tuple) {
+ poFRJoin.setInputs(null);
+ poFRJoin.attachInput(tuple);
+ }
@Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
+ protected Result getNextResult() throws ExecException {
+ return poFRJoin.getNextTuple();
+ }
- @Override
- protected void attach(Tuple tuple) {
- poFRJoin.setInputs(null);
- poFRJoin.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poFRJoin.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- }
- };
+ @Override
+ protected void endOfInput() {
}
};
}
@@ -107,4 +101,4 @@ public class FRJoinConverter implements
public void setReplicatedInputs(Set<String> replicatedInputs) {
this.replicatedInputs = replicatedInputs;
}
-}
\ No newline at end of file
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
Wed Jul 19 01:32:41 2017
@@ -29,13 +29,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
/**
@@ -60,11 +61,11 @@ public class ForEachConverter implements
RDD<Tuple> rdd = predecessors.get(0);
ForEachFunction forEachFunction = new
ForEachFunction(physicalOperator, confBytes);
- return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(forEachFunction),
true).rdd();
}
private static class ForEachFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
private POForEach poForEach;
private byte[] confBytes;
@@ -75,7 +76,8 @@ public class ForEachConverter implements
this.confBytes = confBytes;
}
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ @Override
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
initialize();
@@ -90,29 +92,21 @@ public class ForEachConverter implements
}
}
}
+ return new OutputConsumerIterator(input) {
-
- return new Iterable<Tuple>() {
+ @Override
+ protected void attach(Tuple tuple) {
+ poForEach.setInputs(null);
+ poForEach.attachInput(tuple);
+ }
@Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
+ protected Result getNextResult() throws ExecException {
+ return poForEach.getNextTuple();
+ }
- @Override
- protected void attach(Tuple tuple) {
- poForEach.setInputs(null);
- poForEach.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poForEach.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- }
- };
+ @Override
+ protected void endOfInput() {
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
Wed Jul 19 01:32:41 2017
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
@@ -34,7 +35,6 @@ import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -127,7 +127,7 @@ public class GlobalRearrangeConverter im
}
private static class RemoveValueFunction implements
- FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>,
Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>,
Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
@@ -148,8 +148,8 @@ public class GlobalRearrangeConverter im
}
@Override
- public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
- return new Tuple2TransformIterable(input);
+ public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+ return new Tuple2TransformIterable(input).iterator();
}
}
@@ -330,7 +330,7 @@ public class GlobalRearrangeConverter im
}
}
});
- ++ i;
+ ++i;
}
Tuple out = tf.newTuple(2);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
Wed Jul 19 01:32:41 2017
@@ -24,9 +24,10 @@ import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
@@ -38,11 +39,11 @@ public class LimitConverter implements R
SparkUtil.assertPredecessorSize(predecessors, poLimit, 1);
RDD<Tuple> rdd = predecessors.get(0);
LimitFunction limitFunction = new LimitFunction(poLimit);
- RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
- return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd();
+ RDD<Tuple> rdd2 = SparkShims.getInstance().coalesce(rdd, 1, false);
+ return
rdd2.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(limitFunction),
false).rdd();
}
- private static class LimitFunction implements
FlatMapFunction<Iterator<Tuple>, Tuple> {
+ private static class LimitFunction implements
FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> {
private final POLimit poLimit;
@@ -51,28 +52,22 @@ public class LimitConverter implements R
}
@Override
- public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+ public Iterator<Tuple> call(final Iterator<Tuple> tuples) {
+ return new OutputConsumerIterator(tuples) {
- return new Iterable<Tuple>() {
+ @Override
+ protected void attach(Tuple tuple) {
+ poLimit.setInputs(null);
+ poLimit.attachInput(tuple);
+ }
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(tuples) {
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poLimit.getNextTuple();
+ }
- @Override
- protected void attach(Tuple tuple) {
- poLimit.setInputs(null);
- poLimit.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poLimit.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- }
- };
+ @Override
+ protected void endOfInput() {
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
Wed Jul 19 01:32:41 2017
@@ -24,9 +24,10 @@ import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@@ -37,38 +38,32 @@ public class MergeCogroupConverter imple
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
MergeCogroupFunction mergeCogroupFunction = new
MergeCogroupFunction(physicalOperator);
- return rdd.toJavaRDD().mapPartitions(mergeCogroupFunction, true).rdd();
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeCogroupFunction),
true).rdd();
}
private static class MergeCogroupFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
private POMergeCogroup poMergeCogroup;
@Override
- public Iterable<Tuple> call(final Iterator<Tuple> input) throws
Exception {
- return new Iterable<Tuple>() {
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
+ return new OutputConsumerIterator(input) {
@Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
+ protected void attach(Tuple tuple) {
+ poMergeCogroup.setInputs(null);
+ poMergeCogroup.attachInput(tuple);
+ }
- @Override
- protected void attach(Tuple tuple) {
- poMergeCogroup.setInputs(null);
- poMergeCogroup.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poMergeCogroup.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- poMergeCogroup.setEndOfInput(true);
- }
- };
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poMergeCogroup.getNextTuple();
+ }
+
+ @Override
+ protected void endOfInput() {
+ poMergeCogroup.setEndOfInput(true);
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
Wed Jul 19 01:32:41 2017
@@ -25,9 +25,10 @@ import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@@ -43,11 +44,11 @@ public class MergeJoinConverter implemen
RDD<Tuple> rdd = predecessors.get(0);
MergeJoinFunction mergeJoinFunction = new
MergeJoinFunction(poMergeJoin);
- return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(mergeJoinFunction),
true).rdd();
}
private static class MergeJoinFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
private POMergeJoin poMergeJoin;
@@ -55,29 +56,24 @@ public class MergeJoinConverter implemen
this.poMergeJoin = poMergeJoin;
}
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ @Override
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
+ return new OutputConsumerIterator(input) {
- return new Iterable<Tuple>() {
@Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
+ protected void attach(Tuple tuple) {
+ poMergeJoin.setInputs(null);
+ poMergeJoin.attachInput(tuple);
+ }
- @Override
- protected void attach(Tuple tuple) {
- poMergeJoin.setInputs(null);
- poMergeJoin.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return poMergeJoin.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- poMergeJoin.setEndOfInput(true);
- }
- };
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poMergeJoin.getNextTuple();
+ }
+
+ @Override
+ protected void endOfInput() {
+ poMergeJoin.setEndOfInput(true);
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
Wed Jul 19 01:32:41 2017
@@ -19,10 +19,11 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import java.io.IOException;
@@ -37,10 +38,10 @@ public class PoissonSampleConverter impl
SparkUtil.assertPredecessorSize(predecessors, po, 1);
RDD<Tuple> rdd = predecessors.get(0);
PoissionSampleFunction poissionSampleFunction = new
PoissionSampleFunction(po);
- return rdd.toJavaRDD().mapPartitions(poissionSampleFunction,
false).rdd();
+ return
rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(poissionSampleFunction),
false).rdd();
}
- private static class PoissionSampleFunction implements
FlatMapFunction<Iterator<Tuple>, Tuple> {
+ private static class PoissionSampleFunction implements
FlatMapFunctionAdapter<Iterator<Tuple>, Tuple> {
private final POPoissonSampleSpark po;
@@ -49,29 +50,23 @@ public class PoissonSampleConverter impl
}
@Override
- public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+ public Iterator<Tuple> call(final Iterator<Tuple> tuples) {
+ return new OutputConsumerIterator(tuples) {
- return new Iterable<Tuple>() {
+ @Override
+ protected void attach(Tuple tuple) {
+ po.setInputs(null);
+ po.attachInput(tuple);
+ }
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(tuples) {
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return po.getNextTuple();
+ }
- @Override
- protected void attach(Tuple tuple) {
- po.setInputs(null);
- po.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- return po.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
- po.setEndOfInput(true);
- }
- };
+ @Override
+ protected void endOfInput() {
+ po.setEndOfInput(true);
}
};
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
Wed Jul 19 01:32:41 2017
@@ -22,6 +22,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
+import
org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import scala.Tuple2;
import org.apache.commons.logging.Log;
@@ -30,13 +33,11 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
/**
@@ -56,13 +57,13 @@ public class SecondaryKeySortUtil {
JavaPairRDD<IndexedKey, Tuple> sorted =
pairRDD.repartitionAndSortWithinPartitions(
new IndexedKeyPartitioner(partitionNums));
//Package tuples with same indexedkey as the result:
(key,(val1,val2,val3,...))
- return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+ return
sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(new
AccumulateByKey(pkgOp)), true).rdd();
}
//Package tuples with same indexedkey as the result:
(key,(val1,val2,val3,...))
//Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the
result
- private static class AccumulateByKey implements
FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
- Serializable {
+ private static class AccumulateByKey
+ implements FlatMapFunctionAdapter<Iterator<Tuple2<IndexedKey,
Tuple>>, Tuple>, Serializable {
private POPackage pkgOp;
public AccumulateByKey(POPackage pkgOp) {
@@ -70,7 +71,7 @@ public class SecondaryKeySortUtil {
}
@Override
- public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>>
it) throws Exception {
+ public Iterator<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>>
it) {
return new Iterable<Tuple>() {
Object curKey = null;
ArrayList curValues = new ArrayList();
@@ -132,7 +133,7 @@ public class SecondaryKeySortUtil {
}
};
}
- };
+ }.iterator();
}
private Tuple restructTuple(final Object curKey, final
ArrayList<Tuple> curValues) {