Glad you were able to figure it out, that was very confusing. Thanks for the fix too.
- Prateek On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges <tstump...@ntent.com> wrote: > And that last issue was mine. My setting override was not picked up and it > was using GroupByContainerCount instead. > -Thanks, > Thunder > > > -----Original Message----- > From: Thunder Stumpges > Sent: Monday, March 19, 2018 20:58 > To: dev@samza.apache.org > Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; t...@recursivedream.com; > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> > Subject: RE: Old style "low level" Tasks with alternative deployment > model(s) > > Well I figured it out. My specific issue was due to a simple dependency > problem where I had gotten an older version of the Jackson-mapper library. > However the code was throwing NoSuchMethodError (an Error instead of > Exception) and being silently dropped. I created a pull request to handle > any Throwable in ScheduleAfterDebounceTime. > https://github.com/apache/samza/pull/450 > > I'm now running into an issue with the generation of the JobModel and the > ProcessorId. The ZkJobCoordinator has a ProcessorId that is a Guid, but > when GroupByContainerIds class (my TaskNameGrouper) creates the > ContainerModels, it is using the ContainerId (a numeric value, 0,1,2,etc) > as the ProcessorId (~ line 105). This results in the JobModel that is > generated and published immediately causing the processor to quit with this > message: > > INFO o.apache.samza.zk.ZkJobCoordinator - New JobModel does not contain > pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor. > > I was assuming I should be using GroupByContainerIds as my > TaskNameGrouper. I don't see any other promising implementations. Am I just > missing something? > > Thanks, > Thunder > > JobModel > { > "config" : { > ... > }, > "containers" : { > "0" : { > "tasks" : { > "Partition 0" : { > "task-name" : "Partition 0", > "system-stream-partitions" : [ { > "system" : "kafka", > "partition" : 0, > "stream" : "test_topic1" > }, { > "system" : "kafka", > "partition" : 0, > "stream" : "test_topic2" > } ], > "changelog-partition" : 0 > }, > "Partition 1" : { > "task-name" : "Partition 1", > "system-stream-partitions" : [ { > "system" : "kafka", > "partition" : 1, > "stream" : "test_topic1" > }, { > "system" : "kafka", > "partition" : 1, > "stream" : "test_topic2" > } ], > "changelog-partition" : 1 > } > }, > "container-id" : 0, > "processor-id" : "0" > } > }, > "max-change-log-stream-partitions" : 2, > "all-container-locality" : { > "0" : null > } > } > > -----Original Message----- > From: Thunder Stumpges [mailto:tstump...@ntent.com] > Sent: Friday, March 16, 2018 18:21 > To: dev@samza.apache.org > Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; t...@recursivedream.com; > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> > Subject: RE: Old style "low level" Tasks with alternative deployment > model(s) > > Attached. I don't see any threads actually running this code which is odd. > > There's my main thread that's waiting for the whole thing to finish, the > "debounce-thread-0" (which logged the other surrounding messages below) has > this: > > "debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x00007fa0fd719800 > nid=0x21 waiting on condition [0x00007fa0d0d45000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006f166e350> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport. > java:175) > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at java.util.concurrent.ThreadPoolExecutor.getTask( > ThreadPoolExecutor.java:1067) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1127) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Locked ownable synchronizers: > - None > > Thanks for having a look. > Thunder > > > -----Original Message----- > From: Prateek Maheshwari [mailto:prateek...@gmail.com] > Sent: Friday, March 16, 2018 17:02 > To: dev@samza.apache.org > Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; t...@recursivedream.com; > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> > Subject: Re: Old style "low level" Tasks with alternative deployment > model(s) > > Hi Thunder, > > Can you please take and attach a thread dump with this? > > Thanks, > Prateek > > On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges <tstump...@ntent.com> > wrote: > > > It appears it IS hung while serializing the JobModel... very strange! > > I added some debug statements around the calls: > > > > LOG.debug("Getting object mapper to serialize job model"); // > > this IS printed > > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); > > LOG.debug("Serializing job model"); // this IS printed > > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter > > ().writeValueAsString(jobModel); > > LOG.info("jobModelAsString=" + jobModelStr); // this is NOT > printed! > > > > Another thing I noticed is that "getObjectMapper" actually creates the > > object mapper twice! > > > > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG > > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job > > model > > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Creating new object mapper and simple module > > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Adding SerDes and mixins > > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Adding custom ContainerModel deserializer > > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Setting up naming strategy and registering module > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Done! > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Creating new object mapper and simple module > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Adding SerDes and mixins > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Adding custom ContainerModel deserializer > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Setting up naming strategy and registering module > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > o.a.s.s.model.SamzaObjectMapper > > - Done! > > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > > org.apache.samza.zk.ZkUtils - Serializing job model > > > > Could this ObjectMapper be a singleton? I see there is a private > > static instance, but getObjectMapper creates a new one every time... > > > > Anyway, then it takes off to serialize the job model and never comes > > back... > > > > Hoping someone has some idea here... the implementation for this > > mostly comes from Jackson-mapper-asl, and I have the version that is > > linked in the > > 0.14.0 tag: > > | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 > > | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13 > > > > Thanks! > > Thunder > > > > -----Original Message----- > > From: Thunder Stumpges [mailto:tstump...@ntent.com] > > Sent: Friday, March 16, 2018 15:29 > > To: dev@samza.apache.org; Jagadish Venkatraman > > <jagadish1...@gmail.com> > > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < > > nickpa...@gmail.com> > > Subject: RE: Old style "low level" Tasks with alternative deployment > > model(s) > > > > So, my investigation starts at StreamProcessor.java. Line 294 in > > method > > onNewJobModel() logs an INFO message that it's starting a container. > > This message never appears. > > > > I see that ZkJobCoordinator calls onNewJobModel from its > > onNewJobModelConfirmed method which also logs an info message stating > > "version X of the job model got confirmed". I never see this message > > either, so I go up the chain some more. > > > > I DO see: > > > > 2018-03-16 21:43:58 logback 20498 > > [ZkClient-EventThread-13-10.0.127.114:2181] > > INFO o.apache.samza.zk.ZkJobCoordinator - > > ZkJobCoordinator::onBecomeLeader > > - I became the leader! > > And > > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO > > o.apache.samza.zk.ZkJobCoordinator - > > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated > > new Job Model. Version = 1 > > > > Which led me to method onDoProcessorChange line 210. I see that line, > > but not the line below " Published new Job Model. Version =" so > > something in here is not completing: > > > > LOG.info("pid=" + processorId + "Generated new Job Model. Version = " > > + nextJMVersion); > > > > // Publish the new job model > > zkUtils.publishJobModel(nextJMVersion, jobModel); > > > > // Start the barrier for the job model update > > barrier.create(nextJMVersion, currentProcessorIds); > > > > // Notify all processors about the new JobModel by updating > > JobModel Version number > > zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); > > > > LOG.info("pid=" + processorId + "Published new Job Model. Version = " > > + nextJMVersion); > > > > As I mentioned, after the line "Generated new Job Model. Version = 1" > > I just get repeated zk ping responses.. no more application logging. > > > > The very next thing that's run is zkUtils.publishJobModel() which only > > has two lines before another log statement (which I don't see): > > > > public void publishJobModel(String jobModelVersion, JobModel jobModel) > { > > try { > > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); > > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter > > ().writeValueAsString(jobModel); > > LOG.info("jobModelAsString=" + jobModelStr); > > ... > > > > Could it really be getting hung up on one of these two lines? (seems > > like it must be, but I don't see anything there that seems like it > > would just hang). I'll keep troubleshooting, maybe add some more debug > > logging and try again. > > > > Thanks for any guidance you all might have. > > -Thunder > > > > > > -----Original Message----- > > From: Thunder Stumpges [mailto:tstump...@ntent.com] > > Sent: Friday, March 16, 2018 14:43 > > To: dev@samza.apache.org; Jagadish Venkatraman > > <jagadish1...@gmail.com> > > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < > > nickpa...@gmail.com> > > Subject: RE: Old style "low level" Tasks with alternative deployment > > model(s) > > > > Well I have my stand-alone application in docker and running in > > kubernetes. I think something isn't wired up all the way though, > > because my task never actually gets invoked. I see no errors, however > > I'm not getting the usual startup logs (checking existing offsets, > > "entering run loop"...) My logs look like this: > > > > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO > > kafka.utils.VerifiableProperties > > - Verifying properties > > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO > > kafka.utils.VerifiableProperties > > - Property client.id is overridden to samza_admin-test_stream_task-1 > > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO > > kafka.utils.VerifiableProperties > > - Property metadata.broker.list is overridden to > > test-kafka-kafka.test-svc:9092 > > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO > > kafka.utils.VerifiableProperties > > - Property request.timeout.ms is overridden to 30000 > > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO > > kafka.client.ClientUtils$ - Fetching metadata from broker > > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0 > > for 1 topic(s) Set(dev_k8s.samza.test.topic) > > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG > > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 30000 > > (requested 30000), SO_RCVBUF = 179680 (requested -1), SO_SNDBUF = > > 102400 (requested 102400), connectTimeoutMs = 30000. > > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO > > kafka.producer.SyncProducer - Connected to > > test-kafka-kafka.test-svc:9092 for producing > > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO > > kafka.producer.SyncProducer - Disconnecting from > > test-kafka-kafka.test-svc:9092 > > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG > > kafka.client.ClientUtils$ - Successfully fetched metadata for 1 > > topic(s) > > Set(dev_k8s.samza.test.topic) > > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper > > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc > > has grouped the SystemStreamPartitions into 10 tasks with the > > following > > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, > > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, > > Partition 8] > > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being > > assigned changelog partition 0. > > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being > > assigned changelog partition 1. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being > > assigned changelog partition 2. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being > > assigned changelog partition 3. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being > > assigned changelog partition 4. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being > > assigned changelog partition 5. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being > > assigned changelog partition 6. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being > > assigned changelog partition 7. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being > > assigned changelog partition 8. > > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being > > assigned changelog partition 9. > > 2018-03-16 21:05:55 logback 50838 [main-SendThread(10.0.127.114:2181)] > > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply > > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null > > finished:false header:: 23,4 replyHeader:: 23,14024,0 request:: > > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/ > > JobModelGeneration/jobModelVersion,T response:: > > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878} > > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO > > o.apache.samza.zk.ZkJobCoordinator - > > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated > > new Job Model. Version = 1 > > 2018-03-16 21:06:05 logback 60848 [main-SendThread(10.0.127.114:2181)] > > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > > 0x1622c8b5fc01ac7 after 2ms > > 2018-03-16 21:06:15 logback 70856 [main-SendThread(10.0.127.114:2181)] > > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > > 0x1622c8b5fc01ac7 after 1ms > > 2018-03-16 21:06:25 logback 80865 [main-SendThread(10.0.127.114:2181)] > > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > > 0x1622c8b5fc01ac7 after 2ms ... > > > > The zk ping responses continue every 10 seconds, but no other activity > > or messages occur. > > It looks like it gets as far as confirming the JobModel and grouping > > the partitions, but nothing actually starts up. > > > > Any ideas? > > Thanks in advance! > > Thunder > > > > > > -----Original Message----- > > From: Thunder Stumpges [mailto:tstump...@ntent.com] > > Sent: Thursday, March 15, 2018 16:35 > > To: Jagadish Venkatraman <jagadish1...@gmail.com> > > Cc: dev@samza.apache.org; t...@recursivedream.com; yi...@linkedin.com; > > Yi Pan <nickpa...@gmail.com> > > Subject: RE: Old style "low level" Tasks with alternative deployment > > model(s) > > > > Thanks a lot for the info. I have something basically working at this > > point! I have not integrated it with Docker nor Kubernetes yet, but it > > does run from my local machine. > > > > I have determined that LocalApplicationRunner does NOT do config > > rewriting. I had to write my own little “StandAloneApplicationRunner” > > that handles the “main” entrypoint. It does command parsing using > > CommandLine, load config from ConfigFactory, and perform rewriting > > before creating the new instance of LocalApplicationRunner. This is > > all my StandAloneApplicationRunner contains: > > > > > > object StandAloneSamzaRunner extends App with LazyLogging { > > > > // parse command line args just like JobRunner. > > val cmdline = new ApplicationRunnerCommandLine > > val options = cmdline.parser.parse(args: _*) > > val config = cmdline.loadConfig(options) > > > > val runner = new LocalApplicationRunner(Util.rewriteConfig(config)) > > runner.runTask() > > runner.waitForFinish() > > } > > > > The only config settings I needed to make to use this runner were > > (easily configured due to our central Consul config system and our > rewriter) : > > > > # use the ZK based job coordinator > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > > # need to use GroupByContainerIds instead of GroupByContainerCount > > task.name.grouper.factory=org.apache.samza.container.grouper.task. > > GroupByContainerIdsFactory > > # ZKJC config > > job.coordinator.zk.connect=<our_zk_connection> > > > > I did run into one potential problem; as you see above, I have started > > the task using runTask() and then to prevent my main method from > > returning, I have called waitForFinish(). The first time I ran it, the > > job itself failed because I had forgotten to override the task > > grouper, and container count was pulled from our staging environment. > > There are some failures logged and it appears the JobCoordinator > > fails, but it never returns from waitForFinish. Stack trace and > continuation of log is below: > > > > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR > > o.a.s.zk.ScheduleAfterDebounceTime > > - Execution of action: OnProcessorChange failed. > > java.lang.IllegalArgumentException: Your container count (4) is larger > > than your task count (2). Can't have containers with nothing to do, so > > aborting. > > at org.apache.samza.container.grouper.task.GroupByContainerCount. > > validateTasks(GroupByContainerCount.java:212) > > at org.apache.samza.container.grouper.task. > > GroupByContainerCount.group(GroupByContainerCount.java:62) > > at org.apache.samza.container.grouper.task.TaskNameGrouper. > > group(TaskNameGrouper.java:56) > > at org.apache.samza.coordinator.JobModelManager$.readJobModel( > > JobModelManager.scala:266) > > at org.apache.samza.coordinator.JobModelManager.readJobModel( > > JobModelManager.scala) > > at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel( > > ZkJobCoordinator.java:306) > > at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange( > > ZkJobCoordinator.java:197) > > at org.apache.samza.zk.ZkJobCoordinator$ > LeaderElectorListenerImpl. > > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318) > > at org.apache.samza.zk.ScheduleAfterDebounceTime. > > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134) > > at java.util.concurrent.Executors$RunnableAdapter. > > call$$$capture(Executors.java:511) > > at java.util.concurrent.Executors$RunnableAdapter. > > call(Executors.java) > > at java.util.concurrent.FutureTask.run$$$capture( > > FutureTask.java:266) > > at java.util.concurrent.FutureTask.run(FutureTask.java) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG > > o.a.samza.processor.StreamProcessor - Container is not instantiated yet. > > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG > > org.I0Itec.zkclient.ZkClient - Closing ZkClient... > > 2018-03-15 22:34:32 logback 77789 > > [ZkClient-EventThread-15-10.0.127.114:2181] > > INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event > thread. > > > > And then the application continues on with metric reporters, and other > > debug logging (not actually running the task though) > > > > Thanks in advance for the guidance, this has been easier than I imagined! > > I’ll report back when I get more of the Dockerization/Kubernetes > > running and test it a bit more. > > Cheers, > > Thunder > > > > > > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] > > Sent: Thursday, March 15, 2018 14:46 > > To: Thunder Stumpges <tstump...@ntent.com> > > Cc: dev@samza.apache.org; t...@recursivedream.com; yi...@linkedin.com; > > Yi Pan <nickpa...@gmail.com> > > Subject: Re: Old style "low level" Tasks with alternative deployment > > model(s) > > > > >> Thanks for the info on the tradeoffs. That makes a lot of sense. I > > >> am > > on-board with using ZkJobCoordinator, sounds like some good benefits > > over just the Kafka high-level consumer. > > > > This certainly looks like the simplest alternative. > > > > For your other questions, please find my answers inline. > > > > >> Q1: If I use LocalApplicationRunner, It does not use > > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? > > > > Your understanding is correct. It directly instantiates the > > StreamProcessor, which in-turn creates and runs the SamzaContainer. > > > > >> Q2: If I use LocalApplicationRunner, I will need to code myself the > > loading and rewriting of the Config that is currently handled by > > JobRunner, correct? > > > > I don't think you'll need to do this. IIUC, the LocalApplicationRunner > > should automatically invoke rewriters and do the right thing. > > > > >> Q3: Do I need to also handle coordinator stream(s) and storing of > > config that is done in JobRunner (I don’t think so as the ? > > > > I don't think this is necessary either. The creation of coordinator > > stream and persisting configuration happens in the > > LocalApplicationRunner (more specifically in > StreamManager#createStreams). > > > > >> Q4: Where/How do I specify the Container ID for each instance? Is > > >> there > > a config setting that I can pass, (or pull from an env variable and > > add to the config) ? I am assuming it is my responsibility to ensure > > that each instance is started with a unique container ID..? > > > > Nope, If you are using the ZkJobCoordinator, you need not have to > > worry about assigning IDs for each instance. The framework will > > automatically take care of generating IDs and reaching consensus by > > electing a leader. If you are curious please take a look at > > implementations of the ProcessorIdGenerator interface. > > > > Please let us know should you have further questions! > > > > Best, > > Jagdish > > > > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges > > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote: > > > > Thanks for the info on the tradeoffs. That makes a lot of sense. I am > > on-board with using ZkJobCoordinator, sounds like some good benefits > > over just the Kafka high-level consumer. > > > > > > > > To that end, I have made some notes on possible approaches based on > > the previous thread, and from my look into the code. I’d love to get > feedback. > > > > > > > > Approach 1. Configure jobs to use “ProcessJobFactory” and run > > instances of the job using run-job.sh or using JobRunner directly. > > > > I don’t think this makes sense from what I can see for a few reasons: > > > > * JobRunner is concerned with stuff I don't *think* we need: > > > > * coordinatorSystemProducer|Consumer, > > * writing/reading the configuration to the coordinator streams > > > > * ProcessJobFactory hard-codes the ID to “0” so I don’t think that > > will work for multiple instances. > > > > > > > > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and > > invoke > > LocalApplicationRunner.runTask() > > > > > > > > Q1: If I use LocalApplicationRunner, It does not use > > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? > > > > Q2: If I use LocalApplicationRunner, I will need to code myself > > the loading and rewriting of the Config that is currently handled by > > JobRunner, correct? > > > > Q3: Do I need to also handle coordinator stream(s) and storing of > > config that is done in JobRunner (I don’t think so as the ? > > > > Q4: Where/How do I specify the Container ID for each instance? Is > > there a config setting that I can pass, (or pull from an env variable > > and add to the config) ? I am assuming it is my responsibility to > > ensure that each instance is started with a unique container ID..? > > > > I am getting started on the above (Approach 2.), and looking closer at > > the code so I may have my own answers to my questions, but figured I > > should go ahead and ask now anyway. Thanks! > > > > -Thunder > > > > > > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com<mailto: > > jagadish1...@gmail.com>] > > Sent: Thursday, March 15, 2018 1:41 > > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder > > Stumpges < tstump...@ntent.com<mailto:tstump...@ntent.com>>; > > t...@recursivedream.com <mailto:t...@recursivedream.com> > > Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan < > > nickpa...@gmail.com<mailto:nickpa...@gmail.com>> > > > > Subject: Re: Old style "low level" Tasks with alternative deployment > > model(s) > > > > >> You are correct that this is focused on the higher-level API but > > >> doesn't > > preclude using the lower-level API. I was at the same point you were > > not long ago, in fact, and had a very productive conversation on the > > list > > > > Thanks Tom for linking the thread, and I'm glad that you were able to > > get Kubernetes integration working with Samza. > > > > >> If it is helpful for everyone, once I get the low-level API + > > >> ZkJobCoordinator + Docker + > > K8s working, I'd be glad to formulate an additional sample for > hello-samza. > > > > @Thunder Stumpges: > > We'd be thrilled to receive your contribution. Examples, demos, > > tutorials etc. > > contribute a great deal to improving the ease of use of Apache Samza. > > I'm happy to shepherd design discussions/code-reviews in the > > open-source including answering any questions you may have. > > > > > > >> One thing I'm still curious about, is what are the drawbacks or > > >> complexities of leveraging the Kafka High-level consumer + > > >> PassthroughJobCoordinator in a stand-alone setup like this? We do > > >> have Zookeeper (because of kafka) so I think either would work. The > > >> Kafka High-level consumer comes with other nice tools for > > >> monitoring offsets, lag, etc > > > > > > @Thunder Stumpges: > > > > Samza uses a "Job-Coordinator" to assign your input-partitions among > > the different instances of your application s.t. they don't overlap. A > > typical way to solve this "partition distribution" > > problem is to have a single instance elected as a "leader" and have > > the leader assign partitions to the group. > > The ZkJobCoordinator uses Zk primitives to achieve this, while the > > YarnJC relies on Yarn's guarantee that there will be a > > singleton-AppMaster to achieve this. > > > > A key difference that separates the PassthroughJC from the Yarn/Zk > > variants is that it does _not_ attempt to solve the "partition > > distribution" problem. As a result, there's no leader-election involved. > > Instead, it pushes the problem of "partition distribution" to the > > underlying consumer. > > > > The PassThroughJc supports these 2 scenarios: > > > > 1. Consumer-managed partition distribution: When using the Kafka > > high-level consumer (or an AWS KinesisClientLibrary consumer) with > > Samza, the consumer manages partitions internally. > > > > 2. Static partition distribution: Alternately, partitions can be > > managed statically using configuration. You can achieve static > > partition assignment by implementing a custom > > SystemStreamPartitionGrouper<h > > ttps://samza.apache.org/learn/documentation/0.8/api/ > > javadocs/org/apache/samza/container/grouper/stream/ > > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https:// > > github.com/apache/samza/blob/master/samza-core/src/main/ > > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>. > > Solutions in this category will typically require you to distinguish > > the various processors in the group by providing an "id" for each. > > Once the "id"s are decided, you can then statically compute > > assignments using a function (eg: modulo N). > > You can rely on the following mechanisms to provide this id: > > - Configure each instance differently to have its own id > > - Obtain the id from the cluster-manager. For instance, Kubernetes > > will provide each POD an unique id in the range [0,N). AWS ECS should > > expose similar capabilities via a REST end-point. > > > > >> One thing I'm still curious about, is what are the drawbacks or > > complexities of leveraging the Kafka High-level consumer + > > PassthroughJobCoordinator in a stand-alone setup like this? > > > > Leveraging the Kafka High-level consumer: > > > > The Kafka high-level consumer is not integrated into Samza just yet. > > Instead, Samza's integration with Kafka uses the low-level consumer > > because > > i) It allows for greater control in fetching data from individual > brokers. > > It is simple and performant in-terms of the threading model to have > > one-thread pull from each broker. > > ii) It is efficient in memory utilization since it does not do > > internal-buffering of messages. > > iii) There's no overhead like Kafka-controller heart-beats that are > > driven by consumer.poll > > > > Since there's no built-in integration, you will have to build a new > > SystemConsumer if you need to integrate with the Kafka High-level > consumer. > > Further, there's more a fair bit of complexity to manage in > checkpointing. > > > > >> The Kafka High-level consumer comes with other nice tools for > > >> monitoring offsets, lag, etc > > > > Samza exposes<https://github.com/apache/samza/blob/master/ > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/ > > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring: > > - The current log-end offset for each partition > > - The last check-pointed offset for each partition > > - The number of messages behind the highwatermark of the partition > > > > Please let us know if you need help discovering these or integrating > > these with other systems/tools. > > > > > > Leveraging the Passthrough JobCoordinator: > > > > It's helpful to split this discussion on tradeoffs with PassthroughJC > > into > > 2 parts: > > > > 1. PassthroughJC + consumer managed partitions: > > > > - In this model, Samza has no control over partition-assignment since > > it's managed by the consumer. This means that stateful operations like > > joins that rely on partitions being co-located on the same task will not > work. > > Simple stateless operations (eg: map, filter, remote lookups) are fine. > > > > - A key differentiator between Samza and other frameworks is our > > support for "host > > affinity<https://samza.apache.org/learn/documentation/0.14/ > > yarn/yarn-host-affinity.html>". Samza achieves this by assigning > > partitions to hosts taking data-locality into account. If the consumer > > can arbitrarily shuffle partitions, it'd be hard to support this > > affinity/locality. Often this is a key optimization when dealing with > > large stateful jobs. > > > > 2. PassthroughJC + static partitions: > > > > - In this model, it is possible to make stateful processing (including > > host affinity) work by carefully choosing how "id"s are assigned and > > computed. > > > > Recommendation: > > > > - Owing to the above subtleties, I would recommend that we give the > > ZkJobCoordinator + the built-in low-level Kafka integration a try. > > - If we hit snags down this path, we can certainly explore the > > approach with PassthroughJC + static partitions. > > - Using the PassthroughJC + consumer-managed distribution would be > > least preferable owing to the subtleties I outlined above. > > > > Please let us know should you have more questions. > > > > Best, > > Jagdish > > > > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <tstump...@ntent.com > > <mailto:tstump...@ntent.com>> wrote: > > Wow, what great timing, and what a great thread! I definitely have > > some good starters to go off of here. > > > > If it is helpful for everyone, once I get the low-level API + > > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an > > additional sample for hello-samza. > > > > One thing I'm still curious about, is what are the drawbacks or > > complexities of leveraging the Kafka High-level consumer + > > PassthroughJobCoordinator in a stand-alone setup like this? We do have > > Zookeeper (because of kafka) so I think either would work. The Kafka > > High-level consumer comes with other nice tools for monitoring > > offsets, lag, etc.... > > > > Thanks guys! > > -Thunder > > > > -----Original Message----- > > From: Tom Davis [mailto:t...@recursivedream.com<mailto: > > t...@recursivedream.com>] > > Sent: Wednesday, March 14, 2018 17:50 > > To: dev@samza.apache.org<mailto:dev@samza.apache.org> > > Subject: Re: Old style "low level" Tasks with alternative deployment > > model(s) > > > > Hey there! > > > > You are correct that this is focused on the higher-level API but > > doesn't preclude using the lower-level API. I was at the same point > > you were not long ago, in fact, and had a very productive conversation > on the list: > > you should look for "Question about custom StreamJob/Factory" in the > > list archive for the past couple months. > > > > I'll quote Jagadish Venkatraman from that thread: > > > > > For the section on the low-level API, can you use > > > LocalApplicationRunner#runTask()? It basically creates a new > > > StreamProcessor and runs it. Remember to provide task.class and set > > > it to your implementation of StreamTask or AsyncStreamTask. Please > > > note that this is an evolving API and hence, subject to change. > > > > I ended up just switching to the high-level API because I don't have > > any existing Tasks and the Kubernetes story is a little more straight > > forward there (there's only one container/configuration to deploy). > > > > Best, > > > > Tom > > > > Thunder Stumpges <tstump...@ntent.com<mailto:tstump...@ntent.com>> > writes: > > > > > Hi all, > > > > > > We are using Samza (0.12.0) in about 2 dozen jobs implementing > > > several processing pipelines. We have also begun a significant move > > > of other services within our company to Docker/Kubernetes. Right now > > > our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" > > > jobs > > (many reporting and other batch processing jobs). We would really like > > to move our stream processing off of Hadoop/Yarn and onto Kubernetes. > > > > > > When I just read about some of the new progress in .13 and .14 I got > > > really excited! We would love to have our jobs run as simple > > > libraries in our own JVM, and use the Kafka High-Level-Consumer for > > > partition > > distribution and such. This would let us "dockerfy" our application > > and run/scale in kubernetes. > > > > > > However as I read it, this new deployment model is ONLY for the > > > new(er) High Level API, correct? Is there a plan and/or resources > > > for adapting this back to existing low-level tasks ? How complicated > > > of a > > task is that? Do I have any other options to make this transition easier? > > > > > > Thanks in advance. > > > Thunder > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > >