Hey folks, We have a legacy application that we've run over the years under Samza 0.14.1 (before it was legacy) up through Samza 1.5.1 (which we're currently using). I'm now trying to upgrade to 1.6.0+, and I can't for the life of me get a configuration using the ProcessJobFactory or ThreadJobFactory to start successfully (with app.runner.class=org.apache.samza.runtime.LocalApplicationRunner). The application gets to "ZkJobCoordinator Generating new JobModel with processors [<ids>]" and then stops, with no errors and no indication of what went wrong. Once again, this only happens in 1.6.0+.
I've tried digging through the commits between 1.5.1 and 1.6.0, but nothing stands out as an obvious problem, so I thought it worthwhile to ask the list directly. Are there any tests that use this combination? If not, or if this has been explicitly removed, then what is the preferred way to run a single instance of a streamtask locally? I'm using Samza 1.6.0, Kafka 2.2.2, and ZooKeeper 3.4.14. Thanks in advance for the help! Cheers, Malcolm McFarland Cavulus Here's a sample of the last few log messages, log level set to TRACE: [INFO] Metadata Cluster ID: _6MEL_AQSBW7DpOjoOD95A [DEBUG] Metadata Updated cluster metadata version 2 to Cluster(id = _6MEL_AQSBW7DpOjoOD95A, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null)) [TRACE] NetworkClient [Consumer clientId=kafka_admin_consumer-my_streamtask-1, groupId=my-streamtask-1] Completed receive from node -1 for METADATA with correlation id 0, received {throttle_time_ms=0,brokers=[{node_id=0,host=localhost,port=9092,rack=null}],cluster_id=_6MEL_AQSBW7DpOjoOD95A,controller_id=0,topic_metadata=[{error_code=0,topic=default,is_internal=false,partition_metadata=[{error_code=0,partition=0,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=5,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=1,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=4,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=6,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=7,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=2,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=3,leader=0,replicas=[0],isr=[0],offline_replicas=[]}]}]} [DEBUG] KafkaSystemAdmin Stream default has partitions [Partition(topic = default, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 5, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 1, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 4, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 6, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 7, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = default, partition = 3, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])] [INFO] KafkaSystemAdmin SystemStream partition counts for system kafka: {default=SystemStreamMetadata [streamName=default, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=1]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=2]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=3]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=4]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=5]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=6]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition [partition=7]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]} [DEBUG] MetricsRegistryMap Creating new gauge job-coordinator kafka-default-partitionCount 0. [INFO] ScheduleAfterDebounceTime Trying to cancel the action: OnProcessorChange. [INFO] ScheduleAfterDebounceTime Scheduled action: OnProcessorChange to run after: 0 milliseconds. [INFO] ZkUtils subscribing for jm version change at:/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion [DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet:: clientPath:null serverPath:null finished:false header:: 14,8 replyHeader:: 14,832,0 request:: '/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/processors,T response:: v{'0000000024} [INFO] ZkUtils Found these children - [0000000024] [DEBUG] ZkUtils Active ProcessorZNodes in zookeeper: [0000000024]. [DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet:: clientPath:null serverPath:null finished:false header:: 15,3 replyHeader:: 15,832,0 request:: '/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T response:: s{595,595,1683229357544,1683229357544,0,0,0,0,0,0,595} [DEBUG] ZkClient Subscribed data changes for /app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion [DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet:: clientPath:null serverPath:null finished:false header:: 16,4 replyHeader:: 16,832,0 request:: '/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/processors/0000000024,F response:: #69702d3139322d3136382d38382d38352e75732d776573742d322e636f6d707574652e696e7465726e616c2065303462363533622d323537302d343064632d396330642d353539633139386631633630,s{832,832,1683248966132,1683248966132,0,0,0,72244096555810909,80,0,832} [INFO] ZkJobCoordinator Generating new JobModel with processors: [e04b653b-2570-40dc-9c0d-559c198f1c60]. [DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet:: clientPath:null serverPath:null finished:false header:: 17,4 replyHeader:: 17,832,0 request:: '/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T response:: ,s{595,595,1683229357544,1683229357544,0,0,0,0,0,0,595}