It’s working after adding
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory

Thanks a lot for your help.

Here is my working configuration –

app.name=test-app
app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory

# Job
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.coordinator.zk.connect=localhost:2181
job.coordinator.system=kafka
job.default.system=kafka

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1


From: "Deshpande, Omkar" <omkar_deshpa...@intuit.com>
Date: Thursday, January 3, 2019 at 5:24 PM
To: "dev@samza.apache.org" <dev@samza.apache.org>
Subject: Re: app.class or task.class for beam samza runner

Hello Xinyu,

After adding this property, the beam pipeline is not getting triggered.
Here are the logs - 
https://intuit.app.box.com/s/82cjpw9zabkcrhr1rfdg8mn8hc3zif89

This might be the relevant log snippet -
Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator 
lambda$onNewJobModelAvailable$3
INFO: New JobModel does not contain pid=8303d1c2-d616-4794-a751-fb55ac842522. 
Stopping this processor. New JobModel: JobModel [config={}, 
containers={0=ContainerModel [processorId=0, tasks={Partition 0=TaskModel 
[taskName=Partition 0, systemStreamPartitions=[SystemStreamPartition 
[0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, 
0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, 0]], 
changeLogPartition=Partition [partition=0]]}]}]
Jan 03, 2019 5:09:46 PM org.apache.samza.zk.ZkJobCoordinator stop
INFO: Shutting down Job Coordinator...

And
INFO: Action: JobModelVersionChange completed successfully.
Exception in thread "main" java.lang.NullPointerException
                    at 
org.apache.beam.runners.samza.SamzaPipelineResult.getStateInfo(SamzaPipelineResult.java:106)
                    at 
org.apache.beam.runners.samza.SamzaPipelineResult.waitUntilFinish(SamzaPipelineResult.java:78)

Let me know if I am missing something.



And, btw without  
job.factory.class=org.apache.samza.job.local.ProcessJobFactory, I get this 
exception -
Exception in thread "main" org.apache.samza.SamzaException: Failed to run 
application
                at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:88)
                at 
org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:112)
                at 
org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:47)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
                at 
com.intuit.dedupe.beam.poc.StreamingWordCount.main(StreamingWordCount.java:151)
Caused by: org.apache.samza.SamzaException: no job factory class defined
                at 
org.apache.samza.job.JobRunner.getJobFactory(JobRunner.scala:173)
                at org.apache.samza.job.JobRunner.run(JobRunner.scala:80)
                at 
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:85)
                at java.util.ArrayList.forEach(ArrayList.java:1257)
                at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:82)
                ... 5 more


Regards,
Omkar Deshpande

From: Xinyu Liu <xinyuliu...@gmail.com>
Date: Thursday, January 3, 2019 at 3:25 PM
To: "dev@samza.apache.org" <dev@samza.apache.org>, "Deshpande, Omkar" 
<omkar_deshpa...@intuit.com>
Subject: Re: app.class or task.class for beam samza runner

This email is from an external sender.

Add Omkar email back to this email list.

For your later error, I think you need to add the following config as you are 
using standalone:

app.runner.class=org.apache.samza.runtime.LocalApplicationRunner



Please keep us updated if you run into any further issues.

Thanks,

Xinyu

On Thu, Jan 3, 2019 at 12:14 PM Xinyu Liu 
<xinyuliu...@gmail.com<mailto:xinyuliu...@gmail.com>> wrote:
As Prateek mentioned, I also double checked the exception, which comes from a 
class (ApplicationUtil.java) that only exists in Samza 1.0. Please remove any 
Samza 1.0 dependency since Beam api currently works with Samza 0.14.1.

Your config looks mostly correct to me. The following is not needed:

job.factory.class=org.apache.samza.job.local.ProcessJobFactory

And you probably need to config this for any data repartitioning:

job.default.system=kafka
Thanks,
Xinyu


On Thu, Jan 3, 2019 at 10:03 AM Prateek Maheshwari 
<prateek...@gmail.com<mailto:prateek...@gmail.com>> wrote:
Hi Omkar,

I think it's only possible to get that exception with Samza 1.0. Can
you verify that the deployment is indeed using samza 0.14.1?

Thanks,
Prateek

On Wed, Jan 2, 2019 at 11:40 PM Deshpande, Omkar
<omkar_deshpa...@intuit.com<mailto:omkar_deshpa...@intuit.com>> wrote:
>
> Hello,
>
> I have been able to execute my Samza-Beam application in Local mode. And now 
> I am trying to run a Samza-Beam application in Standalone mode.
>
> Here is my configFile  config.properties:
>
> app.name<http://app.name>=test-app
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> job.coordinator.zk.connect=localhost:2181
> job.coordinator.system=kafka
> job.factory.class=org.apache.samza.job.local.ProcessJobFactory
> # Kafka System
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
> systems.kafka.default.stream.replication.factor=1
>
> I am getting following exception:
>
> org.apache.samza.config.ConfigException: Legacy task applications must set a 
> non-empty task.class in configuration.
>
>       at 
> org.apache.samza.application.ApplicationUtil.fromConfig(ApplicationUtil.java:58)
>
>       at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:87)
>
> Versions:
>         <beam.version>2.9.0</beam.version>
>         <samza.version>0.14.1</samza.version>
>
> As per my understanding, I shouldn’t have to create implementation of 
> StreamApplication or StreamTask while using Beam SDK.
>
> An example of configFile for Samza-Beam Standalone application would be 
> helpful.
>
> Regards,
> Omkar Deshpande

Reply via email to