Hey folks,

I've recently been attempting to upgrade our legacy application from Samza
1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the
application with this command:

./bin/run-app.sh --config-path=path/to/file.properties

Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the
application is starting fully up without errors and then is simply shutting
down, once again without error. Afaict it runs fine on YARN. Does Samza
v1.6.0+ support running local processes? I've tried this on both OS X and
Ubuntu, using Java 1.8.

Here are the relevant portions of the properties file:

task.class=com.cavulus.task.SimpleLegacyTask
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.default.system=kafka
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
job.name=simple-legacy-task
task.inputs=kafka.event-input

...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few
lines of logging output:

2022-08-29 17:19:42,842  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Sending metadata request
(type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)
2022-08-29 17:19:42,843  INFO   [org.apache.kafka.clients.Metadata]
 Cluster ID: fwnjhL2kQayFxN0xpatT-g
2022-08-29 17:19:42,843  DEBUG  [org.apache.kafka.clients.Metadata]
 Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g,
nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller =
localhost:9092 (id: 0 rack: null))
2022-08-29 17:19:42,843  DEBUG
 [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream
simple-legacy-task-broadcast-stream has partitions [Partition(topic =
simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas =
[0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating connection to node localhost:9092
(id: 0 rack: null)
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.bytes-sent
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.bytes-received
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.latency
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.network.Selector]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972,
SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API
versions.
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating API versions fetch from node 0.
2022-08-29 17:19:42,845  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Recorded API versions for node 0:
(Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8],
ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6],
LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0],
UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2
[usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5
[usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5
[usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2
[usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3
[usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1
[usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3
[usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to
1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to
1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1
[usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to
2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1
[usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to
2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1
[usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0],
CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1
[usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1],
ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41):
0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0,
UNKNOWN(44): 0)
2022-08-29 17:19:42,846  DEBUG
 [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream event-input has
partitions [Partition(topic = event-input, partition = 0, leader = 0,
replicas = [0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,846  INFO
[org.apache.samza.system.kafka.KafkaSystemAdmin]  SystemStream partition
counts for system kafka: {event-input=SystemStreamMetadata
[streamName=event-input, partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null]}],
simple-legacy-task-broadcast-stream=SystemStreamMetadata
[streamName=simple-legacy-task-broadcast-stream,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
2022-08-29 17:19:42,850  DEBUG
 [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
job-coordinator kafka-event-input-partitionCount 0.
2022-08-29 17:19:42,850  DEBUG
 [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0.
2022-08-29 17:19:42,851  INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime]  Trying to cancel the
action: OnProcessorChange.
2022-08-29 17:19:42,852  INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime]  Scheduled action:
OnProcessorChange to run after: 20000 milliseconds.
2022-08-29 17:19:42,852  INFO   [org.apache.samza.zk.ZkUtils]   subscribing
for jm version change
at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
2022-08-29 17:19:42,853  DEBUG  [org.apache.zookeeper.ClientCnxn]  Reading
reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null
finished:false header:: 14,3  replyHeader:: 14,265013,0  request::
'/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
 response:: s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584}
2022-08-29 17:19:42,853  DEBUG  [org.I0Itec.zkclient.ZkClient]  Subscribed
data changes for
/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion

At which point the application silently exits.

Thanks in advance for any advice, ideas, things to check, etc.

Cheers,
Malcolm McFarland
Cavulus

Reply via email to