[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524523#comment-16524523
 ] 

Rick Lin edited comment on BEAM-4632 at 6/27/18 6:25 AM:
---------------------------------------------------------

Dear all,

Thanks very much for your attention to this problem.

 

Hi [~aromanenko],

Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT  to 
2.4.0* and *spark2.jackson.version from 2.9.5 to  2.8.9* to run my project 
(updated on my Github). 

When running my pipeline with p.run().waitUntilFinish() for 
StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode 
(spark runner with local[4])*.{color}

The output of my pipeline is to write the amount of data into the PostgreSQL 
(raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows:
{quote}{color:#205081}countData.apply(JdbcIO.<KV<String, Long>>write(){color}
 {color:#205081} 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color}
 {color:#205081} "org.postgresql.Driver",{color}
 {color:#205081} 
"jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color}
 {color:#205081} .withUsername("postgres"){color}
 {color:#205081} .withPassword("postgres")){color}
 {color:#205081} .withStatement("insert into kafkabeamdata (count) 
values(?)"){color}
 {color:#205081} .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter<KV<String, Long>>() {{color}
 {color:#205081} @Override{color}
 {color:#205081} public void setParameters(KV<String, Long> element, 
PreparedStatement query){color}
 {color:#205081} throws SQLException {{color}
 {color:#205081} double count = element.getValue().doubleValue();{color}
 {color:#205081} query.setDouble(1, count);{color}
 {color:#205081} }{color}
 {color:#205081} }));{color}
{quote}
The following figure shows that the amount of data can be wrote into the DB,as:

!DB_table_kafkabeamdata_count.JPG!

In the above table, we can see many zero values (count), and that means there 
is no data in most windows with the applied window/triggering/Watermark:
{quote}".apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
 .triggering(AfterWatermark.pastEndOfWindow()
 
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
 .withAllowedLateness(Duration.ZERO)
 .discardingFiredPanes())".
{quote}
In this project, I hope that the count of data can mostly be equal than the 
amount of data generated in kafka producer. For example, the figure shows when 
using setting "kafkaIO.Read.withMaxNumRecords(500000)" in the pipeline, as:

!.withMaxNumRecords(500000).JPG!

{color:#d04437}The table shows that there is a expected quantity of streaming 
data in each window.{color}

{color:#d04437}If I would like to realize the situation on a streaming mode, 
{color}

{color:#d04437}what can i do related settings for spark runner (set standalone 
mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color}

 

On the other hand, there is another error:

"Caused by: java.lang.ClassNotFoundException: 
com.google.protobuf.GeneratedMessageV3"

as shown in attachments

!the error GeneratedMessageV3.JPG|width=1156,height=249!

For dealing with this error, I have added the required dependency (added on my 
Github), as:

"{color:#14892c}<protobuf-java.version>3.4.0</protobuf-java.version>{color}

{color:#14892c}<!-- 
[https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java] 
-->{color}
 {color:#14892c} <dependency>{color}
 {color:#14892c} <groupId>com.google.protobuf</groupId>{color}
 {color:#14892c} <artifactId>protobuf-java</artifactId>{color}
 {color:#14892c} <version>${protobuf-java.version}</version>{color}
 {color:#14892c} </dependency>{color}{color:#333333}"{color}

Rick


was (Author: ricklin):
Dear all,

Thanks very much for your attention to this problem.

 

Hi [~aromanenko],

Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT  to 
2.4.0* and *spark2.jackson.version from 2.9.5 to  2.8.9* to run my project 
(updated on my Github). 

When running my pipeline with p.run().waitUntilFinish() for 
StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode 
(spark runner with local[4])*.{color}

The output of my pipeline is to write the amount of data into the PostgreSQL 
(raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows:
{quote}{color:#205081}countData.apply(JdbcIO.<KV<String, Long>>write(){color}
{color:#205081} 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color}
{color:#205081} "org.postgresql.Driver",{color}
{color:#205081} 
"jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color}
{color:#205081} .withUsername("postgres"){color}
{color:#205081} .withPassword("postgres")){color}
{color:#205081} .withStatement("insert into kafkabeamdata (count) 
values(?)"){color}
{color:#205081} .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter<KV<String, Long>>() {{color}
{color:#205081} @Override{color}
{color:#205081} public void setParameters(KV<String, Long> element, 
PreparedStatement query){color}
{color:#205081} throws SQLException {{color}
{color:#205081} double count = element.getValue().doubleValue();{color}
{color:#205081} query.setDouble(1, count);{color}
{color:#205081} }{color}
{color:#205081} }));{color}
{quote}
The following figure shows that the amount of data can be wrote into the DB,as:

!DB_table_kafkabeamdata_count.JPG!

In the above table, we can see many zero values (count), and that means there 
is no data in most windows with the applied window/triggering/Watermark:

".apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
 .triggering(AfterWatermark.pastEndOfWindow()
 
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
 .withAllowedLateness(Duration.ZERO)
 .discardingFiredPanes())".

In this project, I hope that the count of data can mostly be equal than the 
amount of data generated in kafka producer. For example, the figure shows when 
using setting "kafkaIO.Read.withMaxNumRecords(500000)" in the pipeline, as:

!.withMaxNumRecords(500000).JPG!

{color:#d04437}The table shows that there is a expected quantity of streaming 
data in each window.{color}

{color:#d04437}If I would like to realize the situation on a streaming mode, 
{color}

{color:#d04437}what can i do related settings for spark runner (set standalone 
mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color}

 

On the other hand, there is another error:

"Caused by: java.lang.ClassNotFoundException: 
com.google.protobuf.GeneratedMessageV3"

as shown in attachments

!the error GeneratedMessageV3.JPG|width=1156,height=249!

For deal with this error, I have added the required dependency (added on my 
Github), as:

"{color:#14892c}<protobuf-java.version>3.4.0</protobuf-java.version>{color}

{color:#14892c}<!-- 
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->{color}
{color:#14892c} <dependency>{color}
{color:#14892c} <groupId>com.google.protobuf</groupId>{color}
{color:#14892c} <artifactId>protobuf-java</artifactId>{color}
{color:#14892c} <version>${protobuf-java.version}</version>{color}
{color:#14892c} </dependency>{color}{color:#333333}"{color}

Rick

> KafkaIO seems to fail on streaming mode over spark runner
> ---------------------------------------------------------
>
>                 Key: BEAM-4632
>                 URL: https://issues.apache.org/jira/browse/BEAM-4632
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-spark
>    Affects Versions: 2.4.0
>         Environment: Ubuntu 16.04.4 LTS
>            Reporter: Rick Lin
>            Assignee: Alexey Romanenko
>            Priority: Major
>         Attachments: .withMaxNumRecords(500000).JPG, 
> DB_table_kafkabeamdata_count.JPG, the error GeneratedMessageV3.JPG, the error 
> GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==================================
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==================================
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF0000} .withTopic("kafkasink"){color}
> {color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to