[
https://issues.apache.org/jira/browse/SAMZA-1371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459203#comment-16459203
]
Zihao Zhang commented on SAMZA-1371:
------------------------------------
Hi [~nickpan47]
I'm a colleague with Hao Song and I'm working on this issue now. According to
our investigation, the reason why some containers get stuck is becuase, in
BrokerProxy thread, SimpleConsumer class (package kafka.consumer) used in
DefaultFetchSimpleConsumer(package org.apache.samza.system.kafka) could not
receive any message for a given partition a specific offset. The stuck offset
is between the first valid offset and the latest valid offset for this given
partition. So the consumer will keep sending request and trying to receive
message in an infinite loop. And The stuck offset would not change for a given
partition by multiple restarts.
My remote debugging indicated that the inputStream(invoked by
NetworkReceive#readFromReadableChannel()) to feed this consumer only contains
some meta data with the topic/partition/offset info instead of a valid topic
message for this given offset.
I've verifyed that the message in the specific offset is still valid. To do
this, I wrote a small tool to fetch message directly from the brokers. When i
tried to fetch message for the stuck offset, it led to the same result -
timeout before receiving any message. Then I bumped up the version of
kafka.clients to 0.11.0.2 and tried again. This time, the consumer will skipped
the stuck offset and return the following x messages as asked.
Due to some build issue, i've not get a chance to try the newest version by
pulling from master. But according to my investigation, there's a good chance
that the higher version of kafka client would help to solved this stuck
containers issue. I'm wondering:
1. Do you have any other suggestion / ideas on this?
2. Do you have a timeline to realse the new samza version with a kafka-clients
dependency of 0.11 ?
Thank you.
> Some Samza Containers get stuck at "Starting BrokerProxy for
> hostname:portnum" while others seem to be fine
> -----------------------------------------------------------------------------------------------------------
>
> Key: SAMZA-1371
> URL: https://issues.apache.org/jira/browse/SAMZA-1371
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.11.0, 0.12.0
> Environment: Samza version: 0.11, 0.12
> Kafka version: 0.11.0.0
> Reporter: Ak Ka
> Priority: Blocker
> Attachments: stdout.log, thread_dump.txt
>
>
> We have multiple Samza apps using local store that have this issue. Some
> containers get stuck on "Starting BrokerProxy for hostname:portnum" while
> others seem to work as expected.
> Here is the log:
> stuck:
> ```
> [...]
> 2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO]
> Validating offset 0 for topic and partition
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
> 2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO]
> Able to successfully read from offset 0 for topic and partition
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2].
> Using it to instantiate consumer.
> 2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Starting BrokerProxy for hostname:portnum
> // it's dead, Jim
> ```
> healthy:
> ```
> [...]
> 2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO]
> Validating offset 0 for topic and partition
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO]
> Able to successfully read from offset 0 for topic and partition
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1].
> Using it to instantiate consumer.
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Starting BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due
> to socket error: java.nio.channels.ClosedChannelException
> 2017-07-25 17:11:29.244 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [WARN] Restarting consumer due to
> java.nio.channels.ClosedChannelException. Releasing ownership of all
> partitions, and restarting consumer. Turn on debugging to get a full stack
> trace.
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Abdicating for
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1]
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Refreshing brokers
> for:
> Map([prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1] ->
> 13572)
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to
> interrupt.
> 2017-07-25 17:11:29.247 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.248 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.265 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to
> interrupt.
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.523 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to
> interrupt.
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.601 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to
> interrupt.
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.663 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1]
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to
> interrupt.
> 2017-07-25 17:11:29.668 [main] org.apache.samza.container.SamzaContainer
> [INFO] Starting host statistics monitor
> 2017-07-25 17:11:29.670 [main] org.apache.samza.container.SamzaContainer
> [INFO] Registering task instances with producers.
> 2017-07-25 17:11:29.674 [main] org.apache.samza.container.SamzaContainer
> [INFO] Starting producer multiplexer.
> 2017-07-25 17:11:29.675 [main] org.apache.samza.container.SamzaContainer
> [INFO] Initializing stream tasks.
> 2017-07-25 17:11:29.676 [main]
> com.company.samza.app.companyStreamingAppWrapper [INFO] Initializing instance
> of streaming application
> 2017-07-25 17:11:29.681 [main]
> com.company.samza.app.companyStreamingAppWrapper [INFO] First initialization.
> Setting up Guice container with configuration
> companyStreamingAppWrapperConfiguration{company.app.name=AlertsOrganizerInstant,
> company.appgroup=aws, company.env=prod,
> company.guice.module=com.company.notifications.Alerts.organizer..AlertsOrganizerModule}
> 2017-07-25 17:11:30.118 [main] com.company.config.guice.configModule [INFO]
> configModule loaded requested override file
> '/storage/data/secure/config/AnalyticsServiceClient.cfg'
> 2017-07-25 17:11:30.480 [main]
> com.company.samza.dataService.SamzaSessionFactoriesModule [INFO] Loading prod
> dbConfig from /data/config/prod.database.properties
> // Hibernate stuff (i.e. our code is hit)
> ```
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)