Hey folks, I've recently been attempting to upgrade our legacy application from Samza 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the application with this command:
./bin/run-app.sh --config-path=path/to/file.properties Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the application is starting fully up without errors and then is simply shutting down, once again without error. Afaict it runs fine on YARN. Does Samza v1.6.0+ support running local processes? I've tried this on both OS X and Ubuntu, using Java 1.8. Here are the relevant portions of the properties file: task.class=com.cavulus.task.SimpleLegacyTask job.factory.class=org.apache.samza.job.local.ThreadJobFactory job.default.system=kafka systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory job.name=simple-legacy-task task.inputs=kafka.event-input ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few lines of logging output: 2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null) 2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata] Cluster ID: fwnjhL2kQayFxN0xpatT-g 2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata] Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null)) 2022-08-29 17:19:42,843 DEBUG [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream simple-legacy-task-broadcast-stream has partitions [Partition(topic = simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])] 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Initiating connection to node localhost:9092 (id: 0 rack: null) 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API versions. 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Initiating API versions fetch from node 0. 2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient] [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1, groupId=simple-legacy-task-1] Recorded API versions for node 0: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5 [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0, UNKNOWN(44): 0) 2022-08-29 17:19:42,846 DEBUG [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream event-input has partitions [Partition(topic = event-input, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])] 2022-08-29 17:19:42,846 INFO [org.apache.samza.system.kafka.KafkaSystemAdmin] SystemStream partition counts for system kafka: {event-input=SystemStreamMetadata [streamName=event-input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}], simple-legacy-task-broadcast-stream=SystemStreamMetadata [streamName=simple-legacy-task-broadcast-stream, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]} 2022-08-29 17:19:42,850 DEBUG [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge job-coordinator kafka-event-input-partitionCount 0. 2022-08-29 17:19:42,850 DEBUG [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0. 2022-08-29 17:19:42,851 INFO [org.apache.samza.zk.ScheduleAfterDebounceTime] Trying to cancel the action: OnProcessorChange. 2022-08-29 17:19:42,852 INFO [org.apache.samza.zk.ScheduleAfterDebounceTime] Scheduled action: OnProcessorChange to run after: 20000 milliseconds. 2022-08-29 17:19:42,852 INFO [org.apache.samza.zk.ZkUtils] subscribing for jm version change at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion 2022-08-29 17:19:42,853 DEBUG [org.apache.zookeeper.ClientCnxn] Reading reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null finished:false header:: 14,3 replyHeader:: 14,265013,0 request:: '/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T response:: s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584} 2022-08-29 17:19:42,853 DEBUG [org.I0Itec.zkclient.ZkClient] Subscribed data changes for /app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion At which point the application silently exits. Thanks in advance for any advice, ideas, things to check, etc. Cheers, Malcolm McFarland Cavulus