Nikos Karavasilis created BEAM-12557:
----------------------------------------
Summary: Beam: Kafka with Spark Runner configuration
Key: BEAM-12557
URL: https://issues.apache.org/jira/browse/BEAM-12557
Project: Beam
Issue Type: Bug
Components: io-java-kafka, runner-spark
Affects Versions: 2.30.0
Reporter: Nikos Karavasilis
I am new to the Beam project and one task of my bachelor thesis is to do some
benchmarking using Beam. I have created a simple "**number-count**" program by
modifying a word-count
example(https://dzone.com/articles/unbounded-stream-processing-using-apache-beam).
I am using a simple **kafka** **topic**(1 partition) and produce a number and a
timestamp(event time) periodically. The problem is I run the pipeline with
**Direct runner** and it works **fine**, but when I use **Spark** it **fails**
without an error. Basically spark as soon as it configures the executors it
shuts down, instead of waiting for kafka. I tested it with yarn or standalone
but nothing.
After spending 1 week i can't figure it out. Any help is highly appreciated.
**I am sure that pom.xml might be missing something**
```
Beam: 2.30.0
Spark: 2.4.7
Kafka: 2.4.1
Hadoop: 2.7.7
Java: 8 (1.8.0_291)
OS: Ubuntu 18.04 Lts
```
The pipeline:
```
Pipeline pipeline = Pipeline.create(options);
Duration WINDOW_TIME = Duration.standardSeconds(5);
Duration ALLOWED_LATENESS = Duration.standardSeconds(5);
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(Record.class, new RecordSerializableCoder());
pipeline.apply(
KafkaIO.<Long, Record>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(RecordDeserializer.class)
.withTimestampPolicyFactory((tp, previousWaterMark) -> new
CustomFieldTimePolicy(previousWaterMark))
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "test.group"))
.withoutMetadata()
)
.apply(Values.<Record>create())
.apply("append event time for PCollection records", WithTimestamps.of((Record
rec) -> new Instant(rec.getTimestamp())))
.apply("extract number", MapElements
.into(TypeDescriptors.longs())
.via(Record::getNumber))
.apply("apply window", Window.<Long>into(FixedWindows.of(WINDOW_TIME))
.withAllowedLateness(ALLOWED_LATENESS)
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
)
.apply("count numbers", new CountNumbers())
.apply("format result to String",MapElements
.into(TypeDescriptors.strings())
.via((KV<Long, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
.apply("Write it to a text file", new
WriteOneFilePerWindow(options.getOutput()));
pipeline.run();
```
**pom.xml**
```
// Spark profile and general dependencies
<profile>
<id>spark-runner</id>
<properties>
<netty.version>4.1.17.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!-- <scope>runtime</scope> -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
</dependencies>
</profile>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<!-- <scope>runtime</scope> -->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<!-- <exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion> -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!-- <scope>runtime</scope> -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>runtime</scope>-->
</dependency>
<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka
-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
```
Submitting the pipeline to yarn:
```
./spark-submit
--class com.nikarav.WindowedNumberCount
--master yarn
target/count-numbers-bundled-1.0.jar
--runner=SparkRunner
--output=counts
```
```
// yarn log
21/06/30 01:50:59 INFO yarn.YarnAllocator: Will request 2 executor
container(s), each with 1 core(s) and 1408 MB memory (including 384 MB of
overhead)
21/06/30 01:50:59 INFO yarn.YarnAllocator: Submitted 2 unlocalized container
requests.
21/06/30 01:50:59 INFO yarn.ApplicationMaster: Started progress reporter thread
with (heartbeat : 3000, initial allocation : 200) intervals
21/06/30 01:50:59 INFO impl.AMRMClientImpl: Received new token for :
nikarav:42295
21/06/30 01:50:59 INFO yarn.YarnAllocator: Launching container
container_1624916683431_0015_01_000002 on host nikarav for executor with ID 1
21/06/30 01:50:59 INFO yarn.YarnAllocator: Received 1 containers from YARN,
launching executors on 1 of them.
21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy:
yarn.client.max-cached-nodemanagers-proxies : 0
21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy: Opening proxy :
nikarav:42295
21/06/30 01:51:01 INFO yarn.YarnAllocator: Launching container
container_1624916683431_0015_01_000003 on host nikarav for executor with ID 2
21/06/30 01:51:01 INFO yarn.YarnAllocator: Received 1 containers from YARN,
launching executors on 1 of them.
21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy:
yarn.client.max-cached-nodemanagers-proxies : 0
21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy: Opening proxy :
nikarav:42295
21/06/30 01:51:06 INFO yarn.YarnAllocator: Driver requested a total number of 0
executor(s).
21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or
disconnected! Shutting down. nikarav:43449
21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or
disconnected! Shutting down. nikarav:43449
21/06/30 01:51:06 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED,
exitCode: 0
21/06/30 01:51:06 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster
with SUCCEEDED
21/06/30 01:51:06 INFO impl.AMRMClientImpl: Waiting for application to be
successfully unregistered.
21/06/30 01:51:06 INFO yarn.ApplicationMaster: Deleting staging directory
hdfs://localhost:9000/user/hadoop/.sparkStaging/application_1624916683431_0015
21/06/30 01:51:06 INFO util.ShutdownHookManager: Shutdown hook called
```
**log** file: https://pastebin.com/nhL5byvK
--
This message was sent by Atlassian Jira
(v8.3.4#803005)