Hello Xinyu, After adding this property, the beam pipeline is not getting triggered. Here are the logs - https://intuit.app.box.com/s/82cjpw9zabkcrhr1rfdg8mn8hc3zif89
This might be the relevant log snippet - Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator lambda$onNewJobModelAvailable$3 INFO: New JobModel does not contain pid=8303d1c2-d616-4794-a751-fb55ac842522. Stopping this processor. New JobModel: JobModel [config={}, containers={0=ContainerModel [processorId=0, tasks={Partition 0=TaskModel [taskName=Partition 0, systemStreamPartitions=[SystemStreamPartition [0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, 0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, 0]], changeLogPartition=Partition [partition=0]]}]}] Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator stop INFO: Shutting down Job Coordinator... And INFO: Action: JobModelVersionChange completed successfully. Exception in thread "main" java.lang.NullPointerException at org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:106) at org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:78) Let me know if I am missing something. And, btw without job.factory.class=org.apache.samza.job.local.ProcessJobFactory, I get this exception - Exception in thread "main" org.apache.samza.SamzaException: Failed to run application at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:88) at org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:112) at org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:47) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at com.intuit.dedupe.beam.poc.StreamingWordCount.main(StreamingWordCount.java:151) Caused by: org.apache.samza.SamzaException: no job factory class defined at org.apache.samza.job.JobRunner.getJobFactory(JobRunner.scala:173) at org.apache.samza.job.JobRunner.run(JobRunner.scala:80) at org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:85) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:82) ... 5 more Regards, Omkar Deshpande From: Xinyu Liu <xinyuliu...@gmail.com> Date: Thursday, January 3, 2019 at 3:25 PM To: "dev@samza.apache.org" <dev@samza.apache.org>, "Deshpande, Omkar" <omkar_deshpa...@intuit.com> Subject: Re: app.class or task.class for beam samza runner This email is from an external sender. Add Omkar email back to this email list. For your later error, I think you need to add the following config as you are using standalone: app.runner.class=org.apache.samza.runtime.LocalApplicationRunner Please keep us updated if you run into any further issues. Thanks, Xinyu On Thu, Jan 3, 2019 at 12:14 PM Xinyu Liu <xinyuliu...@gmail.com<mailto:xinyuliu...@gmail.com>> wrote: As Prateek mentioned, I also double checked the exception, which comes from a class (ApplicationUtil.java) that only exists in Samza 1.0. Please remove any Samza 1.0 dependency since Beam api currently works with Samza 0.14.1. Your config looks mostly correct to me. The following is not needed: job.factory.class=org.apache.samza.job.local.ProcessJobFactory And you probably need to config this for any data repartitioning: job.default.system=kafka Thanks, Xinyu On Thu, Jan 3, 2019 at 10:03 AM Prateek Maheshwari <prateek...@gmail.com<mailto:prateek...@gmail.com>> wrote: Hi Omkar, I think it's only possible to get that exception with Samza 1.0. Can you verify that the deployment is indeed using samza 0.14.1? Thanks, Prateek On Wed, Jan 2, 2019 at 11:40 PM Deshpande, Omkar <omkar_deshpa...@intuit.com<mailto:omkar_deshpa...@intuit.com>> wrote: > > Hello, > > I have been able to execute my Samza-Beam application in Local mode. And now > I am trying to run a Samza-Beam application in Standalone mode. > > Here is my configFile config.properties: > > app.name<http://app.name>=test-app > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > job.coordinator.zk.connect=localhost:2181 > job.coordinator.system=kafka > job.factory.class=org.apache.samza.job.local.ProcessJobFactory > # Kafka System > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.consumer.zookeeper.connect=localhost:2181 > systems.kafka.producer.bootstrap.servers=localhost:9092 > systems.kafka.default.stream.replication.factor=1 > > I am getting following exception: > > org.apache.samza.config.ConfigException: Legacy task applications must set a > non-empty task.class in configuration. > > at > org.apache.samza.application.ApplicationUtil.fromConfig(ApplicationUtil.java:58) > > at > org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:87) > > Versions: > <beam.version>2.9.0</beam.version> > <samza.version>0.14.1</samza.version> > > As per my understanding, I shouldn’t have to create implementation of > StreamApplication or StreamTask while using Beam SDK. > > An example of configFile for Samza-Beam Standalone application would be > helpful. > > Regards, > Omkar Deshpande