Yi Pan (Data Infrastructure) created SAMZA-1088:
---------------------------------------------------
Summary: TestStreamProcessor hangs during job initialization due
to Kafka broker is not available
Key: SAMZA-1088
URL: https://issues.apache.org/jira/browse/SAMZA-1088
Project: Samza
Issue Type: Bug
Affects Versions: 0.12.0
Reporter: Yi Pan (Data Infrastructure)
On my Mac build, running TestStreamProcessor hangs due to the following issue:
{code}
01:35:13.680 [DEBUG] [TestEventLogger]
org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor
STANDARD_OUT
01:35:13.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.678
[ZkClient-EventThread-28-127.0.0.1:53690]
PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition
replica assignment [Map()]
01:35:13.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.683
[ZkClient-EventThread-28-127.0.0.1:53690]
ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
01:35:13.686 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.686
[ZkClient-EventThread-28-127.0.0.1:53690]
PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered
{"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
01:35:13.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.690
[ZkClient-EventThread-28-127.0.0.1:53690]
PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered
{"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
01:35:13.693 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.693
[ZkClient-EventThread-28-127.0.0.1:53690]
ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
Controller 0]: Broker change listener fired for path /brokers/ids with
children 0
01:35:13.697 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.696
[ZkClient-EventThread-28-127.0.0.1:53690]
ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.677 [Test
worker] VerifiableProperties [INFO] Verifying properties
01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test
worker] VerifiableProperties [INFO] Property client.id is overridden to
samza_admin-test_job-1
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test
worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden
to :53693
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test
worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden
to 30000
01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test
worker] ClientUtils$ [INFO] Fetching metadata from broker
BrokerEndPoint(0,,53693) with correlation id 0 for
1 topic(s) Set(numbers)
01:35:17.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test
worker] SyncProducer [INFO] Connected to :53693 for producing
01:35:17.681 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test
worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test
worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for
topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
01:35:17.683 [DEBUG] [TestEventLogger]
java.nio.channels.ClosedChannelException
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.producer.SyncProducer.send(SyncProducer.scala:124)
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
01:35:17.683 [DEBUG] [TestEventLogger] at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
01:35:17.683 [DEBUG] [TestEventLogger] at
org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
01:35:17.683 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
01:35:17.684 [DEBUG] [TestEventLogger] at
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger] at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at
scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
01:35:17.685 [DEBUG] [TestEventLogger] at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger] at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
01:35:17.685 [DEBUG] [TestEventLogger] at
org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger] at
org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
01:35:17.685 [DEBUG] [TestEventLogger] at
org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
01:35:17.685 [DEBUG] [TestEventLogger] at
org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
01:35:17.685 [DEBUG] [TestEventLogger] at
org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
01:35:17.686 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.686 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.686 [DEBUG] [TestEventLogger] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.686 [DEBUG] [TestEventLogger] at
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
01:35:17.686 [DEBUG] [TestEventLogger] at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
01:35:17.687 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.junit.runners.ParentRunner.run(ParentRunner.java:236)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
01:35:17.688 [DEBUG] [TestEventLogger] at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
01:35:17.688 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.688 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.688 [DEBUG] [TestEventLogger] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.689 [DEBUG] [TestEventLogger] at
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.689 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.689 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.689 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
01:35:17.689 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
01:35:17.689 [DEBUG] [TestEventLogger] at
com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
01:35:17.689 [DEBUG] [TestEventLogger] at
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
01:35:17.689 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.690 [DEBUG] [TestEventLogger] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.690 [DEBUG] [TestEventLogger] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.690 [DEBUG] [TestEventLogger] at
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.690 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.690 [DEBUG] [TestEventLogger] at
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.690 [DEBUG] [TestEventLogger] at
org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
01:35:17.690 [DEBUG] [TestEventLogger] at
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
01:35:17.690 [DEBUG] [TestEventLogger] at
org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
01:35:17.690 [DEBUG] [TestEventLogger] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
01:35:17.691 [DEBUG] [TestEventLogger] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
01:35:17.691 [DEBUG] [TestEventLogger] at
java.lang.Thread.run(Thread.java:745)
01:35:17.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.681 [Test
worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.691 [DEBUG] [TestEventLogger]
01:35:17.691 [DEBUG] [TestEventLogger]
org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor
STANDARD_ERROR
01:35:17.691 [DEBUG] [TestEventLogger] 113333 [Test worker] WARN
org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets
for streams [numbers] due to kafka.common.KafkaException: fetching topic
metadata for topics [Set(numbers)] from broker
[ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
{code}
Turns out that the initialization of the job failed during the fetchMetadata
for the input topics. It keeps re-trying due to failed connection to the
broker. At that moment, the broker can not be connected to.
Note that this only happens in Mac and when running the test via gradle:
{noformat}
./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)