[
https://issues.apache.org/jira/browse/BEAM-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nikos Karavasilis updated BEAM-12557:
-------------------------------------
Resolution: Fixed
Status: Resolved (was: Triage Needed)
> Beam: Kafka with Spark Runner configuration
> -------------------------------------------
>
> Key: BEAM-12557
> URL: https://issues.apache.org/jira/browse/BEAM-12557
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-spark
> Affects Versions: 2.30.0
> Reporter: Nikos Karavasilis
> Priority: P2
>
> I am new to the Beam project and one task of my bachelor thesis is to do some
> benchmarking using Beam. I have created a simple "**number-count**" program
> by modifying a word-count
> example(https://dzone.com/articles/unbounded-stream-processing-using-apache-beam).
> I am using a simple **kafka** **topic**(1 partition) and produce a number and
> a timestamp(event time) periodically. The problem is I run the pipeline with
> **Direct runner** and it works **fine**, but when I use **Spark** it
> **fails** without an error. Basically spark as soon as it configures the
> executors it shuts down, instead of waiting for kafka. I tested it with yarn
> or standalone but nothing.
> After spending 1 week i can't figure it out. Any help is highly appreciated.
> **I am sure that pom.xml might be missing something**
> ```
> Beam: 2.30.0
> Spark: 2.4.7
> Kafka: 2.4.1
> Hadoop: 2.7.7
> Java: 8 (1.8.0_291)
> OS: Ubuntu 18.04 Lts
> ```
> The pipeline:
> ```
> Pipeline pipeline = Pipeline.create(options);
> Duration WINDOW_TIME = Duration.standardSeconds(5);
> Duration ALLOWED_LATENESS = Duration.standardSeconds(5);
> CoderRegistry cr = pipeline.getCoderRegistry();
> cr.registerCoderForClass(Record.class, new RecordSerializableCoder());
> pipeline.apply(
> KafkaIO.<Long, Record>read()
> .withBootstrapServers(options.getBootstrap())
> .withTopic(options.getInputTopic())
> .withKeyDeserializer(LongDeserializer.class)
> .withValueDeserializer(RecordDeserializer.class)
> .withTimestampPolicyFactory((tp, previousWaterMark) -> new
> CustomFieldTimePolicy(previousWaterMark))
> .withConsumerConfigUpdates(ImmutableMap.of("group.id", "test.group"))
> .withoutMetadata()
> )
> .apply(Values.<Record>create())
> .apply("append event time for PCollection records",
> WithTimestamps.of((Record rec) -> new Instant(rec.getTimestamp())))
> .apply("extract number", MapElements
> .into(TypeDescriptors.longs())
> .via(Record::getNumber))
> .apply("apply window", Window.<Long>into(FixedWindows.of(WINDOW_TIME))
> .withAllowedLateness(ALLOWED_LATENESS)
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
> .accumulatingFiredPanes()
> )
> .apply("count numbers", new CountNumbers())
> .apply("format result to String",MapElements
> .into(TypeDescriptors.strings())
> .via((KV<Long, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
> .apply("Write it to a text file", new
> WriteOneFilePerWindow(options.getOutput()));
>
> pipeline.run();
> ```
> **pom.xml**
> ```
> // Spark profile and general dependencies
> <profile>
> <id>spark-runner</id>
> <properties>
> <netty.version>4.1.17.Final</netty.version>
> </properties>
> <dependencies>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-spark</artifactId>
> <version>${beam.version}</version>
> <scope>runtime</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-core_2.11</artifactId>
> <version>${spark.version}</version>
> <!--<scope>runtime</scope>-->
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>${beam.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-sql_2.11</artifactId>
> <version>${spark.version}</version>
> <!--<scope>runtime</scope>-->
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-catalyst_2.11</artifactId>
> <version>${spark.version}</version>
> <!--<scope>runtime</scope>-->
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming_2.11</artifactId>
> <version>${spark.version}</version>
> <!-- <scope>runtime</scope> -->
> <exclusions>
> <exclusion>
> <groupId>org.slf4j</groupId>
> <artifactId>jul-to-slf4j</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-core</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-annotations</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-databind</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.module</groupId>
> <artifactId>jackson-module-scala_2.11</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> </dependencies>
> </profile>
> <dependencies>
> <dependency>
> <groupId>com.fasterxml.jackson.module</groupId>
> <artifactId>jackson-module-scala_2.11</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-core</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-annotations</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>com.fasterxml.jackson.core</groupId>
> <artifactId>jackson-databind</artifactId>
> <version>${jackson.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-spark</artifactId>
> <version>${beam.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>${beam.version}</version>
> <!-- <scope>runtime</scope> -->
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-core_2.11</artifactId>
> <version>${spark.version}</version>
> <exclusions>
> <!-- <exclusion>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> </exclusion> -->
> <exclusion>
> <groupId>org.slf4j</groupId>
> <artifactId>jul-to-slf4j</artifactId>
> </exclusion>
> <exclusion>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming_2.11</artifactId>
> <version>${spark.version}</version>
> <!-- <scope>runtime</scope> -->
> <exclusions>
> <exclusion>
> <groupId>org.slf4j</groupId>
> <artifactId>jul-to-slf4j</artifactId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-sql_2.11</artifactId>
> <version>${spark.version}</version>
> <!--<scope>runtime</scope>-->
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-catalyst_2.11</artifactId>
> <version>${spark.version}</version>
> <!--<scope>runtime</scope>-->
> </dependency>
> <!-- Add slf4j API frontend binding with JUL backend -->
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-api</artifactId>
> <version>${slf4j.version}</version>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-jdk14</artifactId>
> <version>${slf4j.version}</version>
> <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
> <scope>runtime</scope>
> </dependency>
> <!-- Adds a dependency on the Beam SDK. -->
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-core</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <dependency>
> <groupId>joda-time</groupId>
> <artifactId>joda-time</artifactId>
> <version>${joda.version}</version>
> </dependency>
> <!-- The DirectRunner is needed for unit tests. -->
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-direct-java</artifactId>
> <version>${beam.version}</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.mockito</groupId>
> <artifactId>mockito-core</artifactId>
> <version>${mockito.version}</version>
> <scope>test</scope>
> </dependency>
> <!--
> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-kafka</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>${kafka.version}</version>
> </dependency>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>${junit.version}</version>
> </dependency>
> ```
> Submitting the pipeline to yarn:
> ```
> ./spark-submit
> --class com.nikarav.WindowedNumberCount
> --master yarn
> target/count-numbers-bundled-1.0.jar
> --runner=SparkRunner
> --output=counts
> ```
> ```
> // yarn log
> 21/06/30 01:50:59 INFO yarn.YarnAllocator: Will request 2 executor
> container(s), each with 1 core(s) and 1408 MB memory (including 384 MB of
> overhead)
> 21/06/30 01:50:59 INFO yarn.YarnAllocator: Submitted 2 unlocalized container
> requests.
> 21/06/30 01:50:59 INFO yarn.ApplicationMaster: Started progress reporter
> thread with (heartbeat : 3000, initial allocation : 200) intervals
> 21/06/30 01:50:59 INFO impl.AMRMClientImpl: Received new token for :
> nikarav:42295
> 21/06/30 01:50:59 INFO yarn.YarnAllocator: Launching container
> container_1624916683431_0015_01_000002 on host nikarav for executor with ID 1
> 21/06/30 01:50:59 INFO yarn.YarnAllocator: Received 1 containers from YARN,
> launching executors on 1 of them.
> 21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy:
> yarn.client.max-cached-nodemanagers-proxies : 0
> 21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy: Opening proxy :
> nikarav:42295
> 21/06/30 01:51:01 INFO yarn.YarnAllocator: Launching container
> container_1624916683431_0015_01_000003 on host nikarav for executor with ID 2
> 21/06/30 01:51:01 INFO yarn.YarnAllocator: Received 1 containers from YARN,
> launching executors on 1 of them.
> 21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy:
> yarn.client.max-cached-nodemanagers-proxies : 0
> 21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy: Opening proxy :
> nikarav:42295
> 21/06/30 01:51:06 INFO yarn.YarnAllocator: Driver requested a total number of
> 0 executor(s).
> 21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated
> or disconnected! Shutting down. nikarav:43449
> 21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated
> or disconnected! Shutting down. nikarav:43449
> 21/06/30 01:51:06 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED,
> exitCode: 0
> 21/06/30 01:51:06 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with SUCCEEDED
> 21/06/30 01:51:06 INFO impl.AMRMClientImpl: Waiting for application to be
> successfully unregistered.
> 21/06/30 01:51:06 INFO yarn.ApplicationMaster: Deleting staging directory
> hdfs://localhost:9000/user/hadoop/.sparkStaging/application_1624916683431_0015
> 21/06/30 01:51:06 INFO util.ShutdownHookManager: Shutdown hook called
> ```
> **log** file: https://pastebin.com/nhL5byvK
--
This message was sent by Atlassian Jira
(v8.3.4#803005)