Yi Pan (Data Infrastructure) created SAMZA-1088:
---------------------------------------------------

             Summary: TestStreamProcessor hangs during job initialization due 
to Kafka broker is not available
                 Key: SAMZA-1088
                 URL: https://issues.apache.org/jira/browse/SAMZA-1088
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.12.0
            Reporter: Yi Pan (Data Infrastructure)


On my Mac build, running TestStreamProcessor hangs due to the following issue:
{code}
01:35:13.680 [DEBUG] [TestEventLogger] 
org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor 
STANDARD_OUT
01:35:13.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.678 
[ZkClient-EventThread-28-127.0.0.1:53690] 
PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on
 Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition 
replica assignment [Map()]
01:35:13.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.683 
[ZkClient-EventThread-28-127.0.0.1:53690] 
ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0
01:35:13.686 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.686 
[ZkClient-EventThread-28-127.0.0.1:53690] 
PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered 
{"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers
01:35:13.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.690 
[ZkClient-EventThread-28-127.0.0.1:53690] 
PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio
nsListener on 0]: Partition modification triggered 
{"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output
01:35:13.693 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.693 
[ZkClient-EventThread-28-127.0.0.1:53690] 
ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
 Controller 0]: Broker change listener fired for path /brokers/ids with 
children 0
01:35:13.697 [DEBUG] [TestEventLogger]     2017-02-15 01:35:13.696 
[ZkClient-EventThread-28-127.0.0.1:53690] 
ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on
 Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0
01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.677 [Test 
worker] VerifiableProperties [INFO] Verifying properties
01:35:17.678 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test 
worker] VerifiableProperties [INFO] Property client.id is overridden to 
samza_admin-test_job-1
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test 
worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden 
to :53693
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test 
worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden 
to 30000
01:35:17.679 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.678 [Test 
worker] ClientUtils$ [INFO] Fetching metadata from broker 
BrokerEndPoint(0,,53693) with correlation id 0 for
 1 topic(s) Set(numbers)
01:35:17.680 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test 
worker] SyncProducer [INFO] Connected to :53693 for producing
01:35:17.681 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test 
worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.683 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.679 [Test 
worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for 
topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed
01:35:17.683 [DEBUG] [TestEventLogger]     
java.nio.channels.ClosedChannelException
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.producer.SyncProducer.send(SyncProducer.scala:124)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37)
01:35:17.683 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67)
01:35:17.684 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193)
01:35:17.685 [DEBUG] [TestEventLogger]          at 
org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
01:35:17.686 [DEBUG] [TestEventLogger]          at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
01:35:17.687 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.junit.runners.ParentRunner.run(ParentRunner.java:236)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.688 [DEBUG] [TestEventLogger]          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
01:35:17.689 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
java.lang.reflect.Method.invoke(Method.java:483)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
01:35:17.690 [DEBUG] [TestEventLogger]          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
01:35:17.691 [DEBUG] [TestEventLogger]          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
01:35:17.691 [DEBUG] [TestEventLogger]          at 
java.lang.Thread.run(Thread.java:745)
01:35:17.691 [DEBUG] [TestEventLogger]     2017-02-15 01:35:17.681 [Test 
worker] SyncProducer [INFO] Disconnecting from :53693
01:35:17.691 [DEBUG] [TestEventLogger] 
01:35:17.691 [DEBUG] [TestEventLogger] 
org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor 
STANDARD_ERROR
01:35:17.691 [DEBUG] [TestEventLogger]     113333 [Test worker] WARN 
org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets 
for streams [numbers] due to kafka.common.KafkaException: fetching topic 
metadata for topics [Set(numbers)] from broker 
[ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
{code}

Turns out that the initialization of the job failed during the fetchMetadata 
for the input topics. It keeps re-trying due to failed connection to the 
broker. At that moment, the broker can not be connected to.

Note that this only happens in Mac and when running the test via gradle:
{noformat}
./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
{noformat}





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to