Hello Xinyu,
After adding this property, the beam pipeline is not getting triggered.
Here are the logs -
https://intuit.app.box.com/s/82cjpw9zabkcrhr1rfdg8mn8hc3zif89
This might be the relevant log snippet -
Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator
lambda$onNewJobModelAvailable$3
INFO: New JobModel does not contain pid=8303d1c2-d616-4794-a751-fb55ac842522.
Stopping this processor. New JobModel: JobModel [config={},
containers={0=ContainerModel [processorId=0, tasks={Partition 0=TaskModel
[taskName=Partition 0, systemStreamPartitions=[SystemStreamPartition
[0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_,
0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, 0]],
changeLogPartition=Partition [partition=0]]}]}]
Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator stop
INFO: Shutting down Job Coordinator...
And
INFO: Action: JobModelVersionChange completed successfully.
Exception in thread "main" java.lang.NullPointerException
at
org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:106)
at
org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:78)
Let me know if I am missing something.
And, btw without
job.factory.class=org.apache.samza.job.local.ProcessJobFactory, I get this
exception -
Exception in thread "main" org.apache.samza.SamzaException: Failed to run
application
at
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:88)
at
org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:112)
at
org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:47)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at
com.intuit.dedupe.beam.poc.StreamingWordCount.main(StreamingWordCount.java:151)
Caused by: org.apache.samza.SamzaException: no job factory class defined
at
org.apache.samza.job.JobRunner.getJobFactory(JobRunner.scala:173)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:80)
at
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:85)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:82)
... 5 more
Regards,
Omkar Deshpande
From: Xinyu Liu <[email protected]>
Date: Thursday, January 3, 2019 at 3:25 PM
To: "[email protected]" <[email protected]>, "Deshpande, Omkar"
<[email protected]>
Subject: Re: app.class or task.class for beam samza runner
This email is from an external sender.
Add Omkar email back to this email list.
For your later error, I think you need to add the following config as you are
using standalone:
app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
Please keep us updated if you run into any further issues.
Thanks,
Xinyu
On Thu, Jan 3, 2019 at 12:14 PM Xinyu Liu
<[email protected]<mailto:[email protected]>> wrote:
As Prateek mentioned, I also double checked the exception, which comes from a
class (ApplicationUtil.java) that only exists in Samza 1.0. Please remove any
Samza 1.0 dependency since Beam api currently works with Samza 0.14.1.
Your config looks mostly correct to me. The following is not needed:
job.factory.class=org.apache.samza.job.local.ProcessJobFactory
And you probably need to config this for any data repartitioning:
job.default.system=kafka
Thanks,
Xinyu
On Thu, Jan 3, 2019 at 10:03 AM Prateek Maheshwari
<[email protected]<mailto:[email protected]>> wrote:
Hi Omkar,
I think it's only possible to get that exception with Samza 1.0. Can
you verify that the deployment is indeed using samza 0.14.1?
Thanks,
Prateek
On Wed, Jan 2, 2019 at 11:40 PM Deshpande, Omkar
<[email protected]<mailto:[email protected]>> wrote:
>
> Hello,
>
> I have been able to execute my Samza-Beam application in Local mode. And now
> I am trying to run a Samza-Beam application in Standalone mode.
>
> Here is my configFile config.properties:
>
> app.name<http://app.name>=test-app
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> job.coordinator.zk.connect=localhost:2181
> job.coordinator.system=kafka
> job.factory.class=org.apache.samza.job.local.ProcessJobFactory
> # Kafka System
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
> systems.kafka.default.stream.replication.factor=1
>
> I am getting following exception:
>
> org.apache.samza.config.ConfigException: Legacy task applications must set a
> non-empty task.class in configuration.
>
> at
> org.apache.samza.application.ApplicationUtil.fromConfig(ApplicationUtil.java:58)
>
> at
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:87)
>
> Versions:
> <beam.version>2.9.0</beam.version>
> <samza.version>0.14.1</samza.version>
>
> As per my understanding, I shouldn’t have to create implementation of
> StreamApplication or StreamTask while using Beam SDK.
>
> An example of configFile for Samza-Beam Standalone application would be
> helpful.
>
> Regards,
> Omkar Deshpande