Hey folks,

We have a legacy application that we've run over the years under Samza
0.14.1 (before it was legacy) up through Samza 1.5.1 (which we're currently
using). I'm now trying to upgrade to 1.6.0+, and I can't for the life of me
get a configuration using the ProcessJobFactory or ThreadJobFactory to
start successfully (with
app.runner.class=org.apache.samza.runtime.LocalApplicationRunner). The
application gets to "ZkJobCoordinator Generating new JobModel with
processors [<ids>]" and then stops, with no errors and no indication of
what went wrong. Once again, this only happens in 1.6.0+.

I've tried digging through the commits between 1.5.1 and 1.6.0, but nothing
stands out as an obvious problem, so I thought it worthwhile to ask the
list directly. Are there any tests that use this combination? If not, or if
this has been explicitly removed, then what is the preferred way to run a
single instance of a streamtask locally? I'm using Samza 1.6.0, Kafka
2.2.2, and ZooKeeper 3.4.14.

Thanks in advance for the help!

Cheers,
Malcolm McFarland
Cavulus

Here's a sample of the last few log messages, log level set to TRACE:

[INFO] Metadata Cluster ID: _6MEL_AQSBW7DpOjoOD95A
[DEBUG] Metadata Updated cluster metadata version 2 to Cluster(id =
_6MEL_AQSBW7DpOjoOD95A, nodes = [localhost:9092 (id: 0 rack: null)],
partitions = [], controller = localhost:9092 (id: 0 rack: null))
[TRACE] NetworkClient [Consumer
clientId=kafka_admin_consumer-my_streamtask-1, groupId=my-streamtask-1]
Completed receive from node -1 for METADATA with correlation id 0, received
{throttle_time_ms=0,brokers=[{node_id=0,host=localhost,port=9092,rack=null}],cluster_id=_6MEL_AQSBW7DpOjoOD95A,controller_id=0,topic_metadata=[{error_code=0,topic=default,is_internal=false,partition_metadata=[{error_code=0,partition=0,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=5,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=1,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=4,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=6,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=7,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=2,leader=0,replicas=[0],isr=[0],offline_replicas=[]},{error_code=0,partition=3,leader=0,replicas=[0],isr=[0],offline_replicas=[]}]}]}
[DEBUG] KafkaSystemAdmin Stream default has partitions [Partition(topic =
default, partition = 0, leader = 0, replicas = [0], isr = [0],
offlineReplicas = []), Partition(topic = default, partition = 5, leader =
0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic =
default, partition = 1, leader = 0, replicas = [0], isr = [0],
offlineReplicas = []), Partition(topic = default, partition = 4, leader =
0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic =
default, partition = 6, leader = 0, replicas = [0], isr = [0],
offlineReplicas = []), Partition(topic = default, partition = 7, leader =
0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic =
default, partition = 2, leader = 0, replicas = [0], isr = [0],
offlineReplicas = []), Partition(topic = default, partition = 3, leader =
0, replicas = [0], isr = [0], offlineReplicas = [])]
[INFO] KafkaSystemAdmin SystemStream partition counts for system kafka:
{default=SystemStreamMetadata [streamName=default,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=null, newestOffset=null, upcomingOffset=null], Partition
[partition=1]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=2]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=3]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=4]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=5]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=6]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null], Partition
[partition=7]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null]}]}
[DEBUG] MetricsRegistryMap Creating new gauge job-coordinator
kafka-default-partitionCount 0.
[INFO] ScheduleAfterDebounceTime Trying to cancel the action:
OnProcessorChange.
[INFO] ScheduleAfterDebounceTime Scheduled action: OnProcessorChange to run
after: 0 milliseconds.
[INFO] ZkUtils  subscribing for jm version change
at:/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
[DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet::
clientPath:null serverPath:null finished:false header:: 14,8  replyHeader::
14,832,0  request::
'/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/processors,T
 response:: v{'0000000024}
[INFO] ZkUtils Found these children - [0000000024]
[DEBUG] ZkUtils Active ProcessorZNodes in zookeeper: [0000000024].
[DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet::
clientPath:null serverPath:null finished:false header:: 15,3  replyHeader::
15,832,0  request::
'/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
 response:: s{595,595,1683229357544,1683229357544,0,0,0,0,0,0,595}
[DEBUG] ZkClient Subscribed data changes for
/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
[DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet::
clientPath:null serverPath:null finished:false header:: 16,4  replyHeader::
16,832,0  request::
'/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/processors/0000000024,F
 response::
#69702d3139322d3136382d38382d38352e75732d776573742d322e636f6d707574652e696e7465726e616c2065303462363533622d323537302d343064632d396330642d353539633139386631633630,s{832,832,1683248966132,1683248966132,0,0,0,72244096555810909,80,0,832}

[INFO] ZkJobCoordinator Generating new JobModel with processors:
[e04b653b-2570-40dc-9c0d-559c198f1c60].
[DEBUG] ClientCnxn Reading reply sessionid:0x100a99f8054005d, packet::
clientPath:null serverPath:null finished:false header:: 17,4  replyHeader::
17,832,0  request::
'/app-my-streamtask-1/my-streamtask-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
 response:: ,s{595,595,1683229357544,1683229357544,0,0,0,0,0,0,595}

Reply via email to