Rick Lin created BEAM-4632:
------------------------------

             Summary: kafkIO should be the streaming mode over spark runner
                 Key: BEAM-4632
                 URL: https://issues.apache.org/jira/browse/BEAM-4632
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka, runner-spark
    Affects Versions: 2.4.0
         Environment: Ubuntu 16.04.4 LTS
            Reporter: Rick Lin
            Assignee: Raghu Angadi
             Fix For: 2.4.0


Dear sir,

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.2.1 (local mode and standalone mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
github: [https://github.com/LinRick/beamkafkaIO],

The description of my situation is as:

{color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used 
to capture data from the assigned broker ip 
([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}

{color:#14892c}The user manual of kafkaIO SDK (on 
web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
that the following parameters need to be set, and then the kafkaIO can work 
well.{color}

 {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color}
{color:#FF0000} .withTopic("kafkasink"){color}
{color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color}
{color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color}

When i run my program with these settings over direct runner, i can find that 
my program perform well. In addition, my running program is the streaming mode. 
*However, i run these codes with the same settings (kafkaIO) over spark runner, 
and my running program is not the streaming mode and is shutdown*. Here, as 
mentioned on the website: 
[https://beam.apache.org/documentation/runners/spark/], the performing program 
will automatically set streaming mode. 

Unfortunately, it failed for my program.

On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
(1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
successfully execute as the batch mode (batch processing).

The steps of performing StarterPipeline.java in my program are:

step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"
step2 mvn clean package
step3 cp -rf target/beamkafkaIO-0.1.jar /root/
step4 cd /spark-2.2.1-bin-hadoop2.6/bin
step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
/root/beamkafkaIO-0.1.jar --runner=SparkRunner

I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
parameter settings over spark runner ?

I really can't handle it, so I hope to get help from you.

if any further information is needed, i am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to deal with this issue.


i am looking forward to hearing from you.
 
Sincerely yours,
 
Rick

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to