[
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524674#comment-16524674
]
Rick Lin edited comment on BEAM-4632 at 6/27/18 7:04 AM:
---------------------------------------------------------
Dear [~aromanenko],
I have tried running my project with Spark runner (standalone mode) to
{color:#d04437}capture more data from kafka{color} into each window, in which
the settings of driver/master/worker nodes respectively are:
Driver node (ubuntu8):
spark-defaults.conf
{quote}spark.driver.memory 10g
spark.executor.memory 2g
spark.executor.instances 4
{quote}
master node (ubuntu8):
{quote}export SPARK_MASTER_IP="ubuntu8"
export SPARK_MASTER_WEBUI_PORT=8082
{quote}
The settings of two worker nodes (ubuntu8 and ubuntu9) are the same as the
master node.
||ExecutorID||Worker||Cores||Memory||State||Logs||
|1|ubuntu9|4|2048|KILLED| |
|0|ubuntu8|4|2048|KILLED| |
In addition, my pipeline is set as:
{quote}Pipeline p = Pipeline.create(options);
{color:#d04437}options.setMaxRecordsPerBatch(1000L);{color}
{color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color}
...
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer,
String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata());
...
p.run().waitUntilFinish();
{quote}
and uses the following command line to run project:
./spark-submit --class com.itri.beam.StarterPipeline --master
spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner
After a while, my program is broken, where this error is as shown in
attachments (error UnboundedDataset.java 81(0) has different number of
partitions.JPG)
"UnboundedDataset.java:81(0) has different number of partitions from original
RDD MapPartitionsRDD[698] at updateStateByKey at
SparkGroupAlsoByWindowViaWindowSet.java:612(2)"
Although only using one work node (ubuntu9), the similar error still appears.
Best,
Rick
was (Author: ricklin):
Dear [~aromanenko],
I have tried running my project with Spark runner (standalone mode) to
{color:#d04437}capture more data from kafka{color} into each window, in which
the settings of driver/master/worker nodes respectively are:
Driver node (ubuntu8):
spark-defaults.conf
{quote}spark.driver.memory 10g
spark.executor.memory 2g
spark.executor.instances 4
{quote}
master node (ubuntu8):
{quote}export SPARK_MASTER_IP="ubuntu8"
export SPARK_MASTER_WEBUI_PORT=8082
{quote}
The settings of two worker nodes (ubuntu8 and ubuntu9) are the same as the
master node.
||ExecutorID||Worker||Cores||Memory||State||Logs||
|1|[worker-20180627140534-ubuntu9-39922|http://10.236.1.9:8081/]|4|2048|KILLED|[stdout|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002&executorId=1&logType=stdout]
[stderr|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002&executorId=1&logType=stderr]|
|0|[worker-20180627141225-ubuntu8-33234|http://10.236.1.8:8081/]|4|2048|KILLED|[stdout|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002&executorId=0&logType=stdout]
[stderr|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002&executorId=0&logType=stderr]|
In addition, my pipeline is set as:
{quote}Pipeline p = Pipeline.create(options);
{color:#d04437}options.setMaxRecordsPerBatch(1000L);{color}
{color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color}
...
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer,
String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata());
...
p.run().waitUntilFinish();
{quote}
and uses the following command line to run project:
./spark-submit --class com.itri.beam.StarterPipeline --master
spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner
After a while, my program is broken, where this error is as shown in
attachments (error UnboundedDataset.java 81(0) has different number of
partitions.JPG)
"UnboundedDataset.java:81(0) has different number of partitions from original
RDD MapPartitionsRDD[698] at updateStateByKey at
SparkGroupAlsoByWindowViaWindowSet.java:612(2)"
Although only using one work node (ubuntu9), the similar error still appears.
Best,
Rick
> KafkaIO seems to fail on streaming mode over spark runner
> ---------------------------------------------------------
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-spark
> Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
> Reporter: Rick Lin
> Assignee: Alexey Romanenko
> Priority: Major
> Attachments: .withMaxNumRecords(500000).JPG,
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==================================
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==================================
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is
> used to capture data from the assigned broker ip
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates
> that the following parameters need to be set, and then the kafkaIO can work
> well.{color}
> {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF0000} .withTopic("kafkasink"){color}
> {color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that
> my program perform well. In addition, my running program is the streaming
> mode. *However, i run these codes with the same settings (kafkaIO) over spark
> runner, and my running program is not the streaming mode and is shutdown*.
> Here, as mentioned on the website:
> [https://beam.apache.org/documentation/runners/spark/], the performing
> program will automatically set streaming mode.
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords
> (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4]
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>
> Sincerely yours,
>
> Rick
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)