[ https://issues.apache.org/jira/browse/SAMZA-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319158#comment-14319158 ]
Navina Ramesh commented on SAMZA-560: ------------------------------------- After a lot of hiccups, finally managed to add StreamAppender to hello-samza and ran it. I don't see the TimeoutException and see the logs flowing through the stream. [~closeuris] I am not really sure what is happening in your case :( > StreamAppender not working after upgrading kafka producer API > ------------------------------------------------------------- > > Key: SAMZA-560 > URL: https://issues.apache.org/jira/browse/SAMZA-560 > Project: Samza > Issue Type: Bug > Reporter: Yan Fang > Assignee: Navina Ramesh > > After SAMZA-227, StreamAppender is not working. Digging into it a little, > still can not figure it out. > It throws exception with > {code} > Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: > Failed to update metadata after 60000 ms. > {code} > Log: > {code} > 2015-02-11 12:19:15 SamzaAppMaster$ [INFO] got container id: > container_1423528474084_0023_02_000001 > 2015-02-11 12:19:15 KafkaSystemProducer [TRACE] Enqueueing message: > log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, > stream=__samza_printout_task1_1_logs], keySerializerName=null, > messageSerializerName=null, partitionKey=[B@155a6bd1, key=[B@155a6bd1, > message=[B@635c714a]. > 2015-02-11 12:19:15 KafkaSystemProducer [INFO] Creating a new producer for > system kafka2. > 2015-02-11 12:19:15 ProducerConfig [INFO] ProducerConfig values: > block.on.buffer.full = true > retry.backoff.ms = 100 > buffer.memory = 33554432 > batch.size = 16384 > metrics.sample.window.ms = 30000 > metadata.max.age.ms = 300000 > receive.buffer.bytes = 32768 > timeout.ms = 30000 > max.in.flight.requests.per.connection = 1 > metric.reporters = [] > bootstrap.servers = [localhost:9092] > client.id = samza_producer-printout_task1-1-1423685955347-0 > compression.type = none > retries = 2147483647 > max.request.size = 1048576 > send.buffer.bytes = 131072 > acks = 1 > reconnect.backoff.ms = 10 > linger.ms = 0 > metrics.num.samples = 2 > metadata.fetch.timeout.ms = 60000 > 2015-02-11 12:19:15 KafkaProducer [TRACE] Starting the Kafka producer > 2015-02-11 12:19:15 Metadata [DEBUG] Updated cluster metadata version 1 to > Cluster(nodes = [Node(localhost, 9092)], partitions = []) > 2015-02-11 12:19:15 KafkaProducer [DEBUG] Kafka producer started > 2015-02-11 12:19:15 KafkaSystemProducer [DEBUG] Created a new producer for > system kafka2. > 2015-02-11 12:19:15 KafkaProducer [TRACE] Requesting metadata update for > topic __samza_printout_task1_1_logs. > 2015-02-11 12:19:15 Sender [DEBUG] Starting Kafka producer I/O thread. > 2015-02-11 12:20:15 KafkaSystemProducer [TRACE] Enqueueing message: > log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, > stream=__samza_printout_task1_1_logs], keySerializerName=null, > messageSerializerName=null, partitionKey=[B@609548c3, key=[B@609548c3, > message=[B@68dc2bbe]. > 2015-02-11 12:20:15 KafkaProducer [TRACE] Requesting metadata update for > topic __samza_printout_task1_1_logs. > {code} > Also tested configuring two kafka systems in one job, which worked. > Really can not figure it out why the KafkaProducer created through the > StreamAppender does not work. > Any ideas? -- This message was sent by Atlassian JIRA (v6.3.4#6332)