Hi,

did you see that the problem starts from a Kafka exception „Failed to send data 
to Kafka: This server is not the leader for that topic-partition.“? Is it 
possible that you had a network issue and the producer could not find the 
leader broker?

Best,
Stefan 

> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsha...@gmail.com>:
> 
> ​Hi ,
> 
> I am always facing this issue with Flink job on yarn.
> Basically I am reading data from kafka, transforming it & putting in kafka
> only.
> 
> My build.sbt is:
> 
> val flinkVersion = "1.3.2"
> val flinkKafkaConnect = "0.10.2"
> 
> libraryDependencies ++= Seq(
>    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>    "org.apache.flink" %% "flink-table" % flinkVersion,
>    "org.json4s" %% "json4s-native" % "3.5.3",
>    "org.json4s" %% "json4s-jackson" % "3.5.3"
> 
> )
> 
> *Note: One of the node in our kafka Cluster ​goes down.*
> 
> 
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:289)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> java:173)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
> java:108)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
> java:188)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
> java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
> java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
> scala:58)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
> scala:75)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:65)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:36)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
> java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
> java:597)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
> java:504)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
> java:275)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
> java:946)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:286)
> ... 7 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:37)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:28)
> at DataStreamCalcRule$127.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:67)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:35)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
> java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 23 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 35 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
> java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 41 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> is not the leader for that topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
> java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invoke(FlinkKafkaProducer010.java:407)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 52 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 2017-12-20 05:42:16,008 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING
> to FAILING.
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:289)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> java:173)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
> java:108)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
> java:188)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
> java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
> java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
> scala:58)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
> scala:75)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:65)
> at
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
> scala:36)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
> java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
> java:597)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
> java:504)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
> java:275)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
> java:946)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> java:286)
> ... 7 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:37)
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
> scala:28)
> at DataStreamCalcRule$127.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:67)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
> scala:35)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
> java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 23 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 35 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:530)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
> java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
> java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
> java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
> scala:622)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
> scala:622)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
> java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 41 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> is not the leader for that topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
> java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> .invoke(FlinkKafkaProducer010.java:407)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
> java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
> java:528)
> ... 52 more
> 
> 
> Thanks
> 
> -- 
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*

Reply via email to