[
https://issues.apache.org/jira/browse/BEAM-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276937#comment-16276937
]
ASF GitHub Bot commented on BEAM-1920:
--------------------------------------
jbonofre closed pull request #3808: [BEAM-1920] Add a Spark 2.x support in the
Spark runner
URL: https://github.com/apache/beam/pull/3808
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index 7b2522770fd..4ddf2cb89e2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -38,7 +38,7 @@ def grpc_google_common_protos = "0.1.9"
def hamcrest_version = "1.3"
def hadoop_version = "2.7.3"
def jackson_version = "2.8.9"
-def spark_version = "1.6.3"
+def spark_version = "2.2.0"
def pubsub_grpc_version = "0.1.18"
def apex_core_version = "3.6.0"
def apex_malhar_version = "3.4.0"
@@ -139,9 +139,9 @@ ext.library = [
slf4j_jdk14: "org.slf4j:slf4j-jdk14:1.7.25",
slf4j_log4j12: "org.slf4j:slf4j-log4j12:1.7.25",
snappy_java: "org.xerial.snappy:snappy-java:1.1.4",
- spark_core: "org.apache.spark:spark-core_2.10:$spark_version",
- spark_network_common:
"org.apache.spark:spark-network-common_2.10:$spark_version",
- spark_streaming: "org.apache.spark:spark-streaming_2.10:$spark_version",
+ spark_core: "org.apache.spark:spark-core_2.11:$spark_version",
+ spark_network_common:
"org.apache.spark:spark-network-common_2.11:$spark_version",
+ spark_streaming: "org.apache.spark:spark-streaming_2.11:$spark_version",
stax2_api: "org.codehaus.woodstox:stax2-api:3.1.4",
woodstox_core_asl: "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
],
diff --git a/pom.xml b/pom.xml
index efddbeb7e73..fdfd964b085 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,7 +147,7 @@
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
<slf4j.version>1.7.25</slf4j.version>
<spanner.version>0.20.0-beta</spanner.version>
- <spark.version>1.6.3</spark.version>
+ <spark.version>2.2.0</spark.version>
<spring.version>4.3.5.RELEASE</spring.version>
<stax2.version>3.1.4</stax2.version>
<storage.version>v1-rev71-1.22.0</storage.version>
@@ -1313,19 +1313,19 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-network-common_2.10</artifactId>
+ <artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index 02ba8cb16eb..bd53b0a64b0 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -77,6 +77,27 @@ public void update(String step, MetricsContainerImpl
container) {
getContainer(step).update(container);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MetricsContainerStepMap that = (MetricsContainerStepMap) o;
+
+ return metricsContainers != null
+ ? metricsContainers.equals(that.metricsContainers)
+ : that.metricsContainers == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return metricsContainers != null ? metricsContainers.hashCode() : 0;
+ }
+
/**
* Returns {@link MetricResults} based on given
* {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and
committed metrics.
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 3f32da11e80..91556d77169 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -70,7 +70,7 @@ dependencies {
provided library.java.commons_io_2x
provided library.java.hamcrest_core
provided "org.apache.zookeeper:zookeeper:3.4.6"
- provided "org.scala-lang:scala-library:2.10.5"
+ provided "org.scala-lang:scala-library:2.11.8"
provided "com.esotericsoftware.kryo:kryo:2.21"
testCompile project(path:
":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-io-parent:beam-sdks-java-io-kafka",
configuration: "shadow")
testCompile project(path:
":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration:
"shadowTest")
@@ -81,7 +81,7 @@ dependencies {
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.jackson_dataformat_yaml
- testCompile "org.apache.kafka:kafka_2.10:0.9.0.1"
+ testCompile "org.apache.kafka:kafka_2.11:0.9.0.1"
}
configurations.testRuntimeClasspath {
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index c4dc4f4f9d2..d8f1ea75da0 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -136,17 +136,17 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <artifactId>spark-streaming_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-network-common_2.10</artifactId>
+ <artifactId>spark-network-common_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@@ -159,17 +159,27 @@
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- <version>2.21</version>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ <version>3.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
@@ -231,7 +241,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.10.5</version>
+ <version>2.11.8</version>
<scope>provided</scope>
</dependency>
<dependency>
@@ -281,6 +291,24 @@
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.9.0.1</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test dependencies -->
@@ -301,10 +329,10 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${kafka.version}</version>
- <scope>test</scope>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.11</artifactId>
+ <version>${jackson.version}</version>
+ <scope>provided</scope>
</dependency>
<!-- Depend on test jar to scan for ValidatesRunner tests -->
@@ -426,5 +454,29 @@
</plugin>
</plugins>
</pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <enforceBytecodeVersion>
+ <maxJdkVersion>1.8</maxJdkVersion>
+ </enforceBytecodeVersion>
+ <requireJavaVersion>
+ <version>[1.8,)</version>
+ </requireJavaVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 26af0c02ef9..adcb56c340f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import java.util.Iterator;
+
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -309,8 +311,8 @@ public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2)
throws Exception {
implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> {
@Override
- public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws
Exception {
- return t2._1();
+ public Iterator<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws
Exception {
+ return t2._1().iterator();
}
}
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1fb8700fe25..b7f0083ced5 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -534,7 +534,7 @@ public Boolean call(
keyCoder, wvCoder.getValueCoder(),
wvCoder.getWindowCoder());
@Override
- public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
+ public java.util.Iterator<WindowedValue<KV<K,
Iterable<InputT>>>> call(
final Tuple2<
/*K*/ ByteArray,
Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/
List<byte[]>>>
@@ -542,7 +542,8 @@ public Boolean call(
throws Exception {
// drop the state since it is already persisted at this point.
// return in serialized form.
- return CoderHelpers.fromByteArrays(t2._2()._2(),
windowedValueKeyValueCoder);
+ return CoderHelpers.fromByteArrays(t2._2()._2(),
windowedValueKeyValueCoder)
+ .iterator();
}
});
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index 4fd8146d21c..1e58aab5e91 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -21,8 +21,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,7 +37,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
-
/**
* An implementation of {@link TimerInternals} for the SparkRunner.
*/
@@ -101,8 +102,9 @@ public static SparkTimerInternals global(Map<Integer,
SparkWatermarks> watermark
return timers;
}
- void addTimers(Iterable<TimerData> timers) {
- for (TimerData timer: timers) {
+ void addTimers(Iterator<TimerData> timers) {
+ while (timers.hasNext()) {
+ TimerData timer = timers.next();
this.timers.add(timer);
}
}
@@ -168,9 +170,9 @@ public void deleteTimer(StateNamespace namespace, String
timerId) {
return CoderHelpers.toByteArrays(timers, timerDataCoder);
}
- public static Iterable<TimerData> deserializeTimers(
+ public static Iterator<TimerData> deserializeTimers(
Collection<byte[]> serTimers, TimerDataCoder timerDataCoder) {
- return CoderHelpers.fromByteArrays(serTimers, timerDataCoder);
+ return CoderHelpers.fromByteArrays(serTimers, timerDataCoder).iterator();
}
@Override
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 72995833982..d26a6f07c2f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -101,7 +101,7 @@ public MultiDoFnFunction(
}
@Override
- public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> call(
+ public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(
Iterator<WindowedValue<InputT>> iter) throws Exception {
DoFnOutputManager outputManager = new DoFnOutputManager();
@@ -151,7 +151,8 @@ public TimerInternals timerInternals() {
return new SparkProcessContext<>(
doFn, doFnRunnerWithMetrics, outputManager,
stateful ? new TimerDataIterator(timerInternals) :
-
Collections.<TimerInternals.TimerData>emptyIterator()).processPartition(iter);
+
Collections.<TimerInternals.TimerData>emptyIterator()).processPartition(iter)
+ .iterator();
}
private static class TimerDataIterator implements
Iterator<TimerInternals.TimerData> {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index fcf438c96fd..e5b66e0e854 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import
org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.InMemoryTimerInternals;
@@ -71,7 +72,7 @@ public SparkGroupAlsoByWindowViaOutputBufferFn(
}
@Override
- public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
+ public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(
WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>> windowedValue)
throws Exception {
K key = windowedValue.getValue().getKey();
Iterable<WindowedValue<InputT>> values =
windowedValue.getValue().getValue();
@@ -115,7 +116,7 @@ public SparkGroupAlsoByWindowViaOutputBufferFn(
reduceFnRunner.persist();
- return outputter.getOutputs();
+ return outputter.getOutputs().iterator();
}
private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 90f5ee3b2e0..42867b6e992 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -149,7 +149,7 @@ public T2 call(Tuple2<T1, T2> v1) throws Exception {
public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V>
toPairFlatMapFunction() {
return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() {
@Override
- public Iterable<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
+ public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
final Iterator<Tuple2<K, V>> outputItr =
Iterators.transform(
itr,
@@ -160,13 +160,7 @@ public T2 call(Tuple2<T1, T2> v1) throws Exception {
return new Tuple2<>(kv.getKey(), kv.getValue());
}
});
- return new Iterable<Tuple2<K, V>>() {
-
- @Override
- public Iterator<Tuple2<K, V>> iterator() {
- return outputItr;
- }
- };
+ return outputItr;
}
};
}
@@ -185,7 +179,7 @@ public T2 call(Tuple2<T1, T2> v1) throws Exception {
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>
fromPairFlatMapFunction() {
return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
@Override
- public Iterable<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
+ public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
final Iterator<KV<K, V>> outputItr =
Iterators.transform(
itr,
@@ -195,12 +189,7 @@ public T2 call(Tuple2<T1, T2> v1) throws Exception {
return KV.of(t2._1(), t2._2());
}
});
- return new Iterable<KV<K, V>>() {
- @Override
- public Iterator<KV<K, V>> iterator() {
- return outputItr;
- }
- };
+ return outputItr;
}
};
}
@@ -351,7 +340,7 @@ public void call(T t) throws Exception {
return new PairFlatMapFunction<Iterator<T>, K, V>() {
@Override
- public Iterable<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
+ public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
final Iterator<Tuple2<K, V>> outputItr =
Iterators.transform(
itr,
@@ -366,13 +355,7 @@ public void call(T t) throws Exception {
}
}
});
- return new Iterable<Tuple2<K, V>>() {
-
- @Override
- public Iterator<Tuple2<K, V>> iterator() {
- return outputItr;
- }
- };
+ return outputItr;
}
};
}
@@ -394,7 +377,7 @@ public void call(T t) throws Exception {
return new FlatMapFunction<Iterator<InputT>, OutputT>() {
@Override
- public Iterable<OutputT> call(Iterator<InputT> itr) throws Exception {
+ public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
final Iterator<OutputT> outputItr =
Iterators.transform(
itr,
@@ -409,13 +392,7 @@ public OutputT apply(InputT t) {
}
}
});
- return new Iterable<OutputT>() {
-
- @Override
- public Iterator<OutputT> iterator() {
- return outputItr;
- }
- };
+ return outputItr;
}
};
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 8ad3ca41533..101bb33cc55 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -39,7 +39,6 @@
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
-import org.apache.spark.storage.BlockStore;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaBatchInfo;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
@@ -48,9 +47,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.collection.Iterator;
+import scala.reflect.ClassTag;
/**
- * A {@link BlockStore} variable to hold the global watermarks for a
micro-batch.
+ * A store to hold the global watermarks for a micro-batch.
*
* <p>For each source, holds a queue for the watermarks of each micro-batch
that was read,
* and advances the watermarks according to the queue (first-in-first-out).
@@ -61,6 +62,8 @@
private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new
HashMap<>();
private static final BlockId WATERMARKS_BLOCK_ID =
BlockId.apply("broadcast_0WATERMARKS");
+ private static final ClassTag<Map> WATERMARKS_TAG =
+ scala.reflect.ClassManifestFactory.fromClass(Map.class);
// a local copy of the watermarks is stored on the driver node so that it
can be
// accessed in test mode instead of fetching blocks remotely
@@ -245,7 +248,8 @@ private static void writeRemoteWatermarkBlock(
blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
// if an executor tries to fetch the watermark block here, it will fail to
do so since
// the watermark block has just been removed, but the new copy has not
been put yet.
- blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks,
StorageLevel.MEMORY_ONLY(), true);
+ blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks,
StorageLevel.MEMORY_ONLY(),
+ true, WATERMARKS_TAG);
// if an executor tries to fetch the watermark block here, it still may
fail to do so since
// the put operation might not have been executed yet
// see also https://issues.apache.org/jira/browse/BEAM-2789
@@ -262,7 +266,8 @@ private static void writeRemoteWatermarkBlock(
WATERMARKS_BLOCK_ID,
empty,
StorageLevel.MEMORY_ONLY(),
- true);
+ true,
+ WATERMARKS_TAG);
return empty;
} else {
return watermarks;
@@ -270,9 +275,16 @@ private static void writeRemoteWatermarkBlock(
}
private static Map<Integer, SparkWatermarks>
fetchSparkWatermarks(BlockManager blockManager) {
- final Option<BlockResult> blockResultOption =
blockManager.getRemote(WATERMARKS_BLOCK_ID);
+ final Option<BlockResult> blockResultOption =
blockManager.get(WATERMARKS_BLOCK_ID,
+ WATERMARKS_TAG);
if (blockResultOption.isDefined()) {
- return (Map<Integer, SparkWatermarks>)
blockResultOption.get().data().next();
+ Iterator<Object> data = blockResultOption.get().data();
+ Map<Integer, SparkWatermarks> next = (Map<Integer, SparkWatermarks>)
data.next();
+ // Spark 2 only triggers completion at the end of the iterator.
+ while (data.hasNext()) {
+ // NO-OP
+ }
+ return next;
} else {
return null;
}
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 85440ffe1e0..1116b972e6a 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -235,12 +235,12 @@
<!-- optional -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <artifactId>spark-streaming_2.11</artifactId>
</dependency>
</dependencies>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Spark 2.x support in Spark runner
> -------------------------------------
>
> Key: BEAM-1920
> URL: https://issues.apache.org/jira/browse/BEAM-1920
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
>
> I have a branch working with both Spark 1 and Spark 2 backend.
> I'm preparing a pull request about that.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)