Hi, did you see that the problem starts from a Kafka exception „Failed to send data to Kafka: This server is not the leader for that topic-partition.“? Is it possible that you had a network issue and the producer could not find the leader broker?
Best, Stefan > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsha...@gmail.com>: > > Hi , > > I am always facing this issue with Flink job on yarn. > Basically I am reading data from kafka, transforming it & putting in kafka > only. > > My build.sbt is: > > val flinkVersion = "1.3.2" > val flinkKafkaConnect = "0.10.2" > > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, > "org.apache.flink" %% "flink-table" % flinkVersion, > "org.json4s" %% "json4s-native" % "3.5.3", > "org.json4s" %% "json4s-jackson" % "3.5.3" > > ) > > *Note: One of the node in our kafka Cluster goes down.* > > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. > java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. > java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. > java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. > java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. > scala:58) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. > scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:36) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. > java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. > java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. > java:504) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. > java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. > java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. > java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:286) > ... 7 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:28) > at DataStreamCalcRule$127.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. > java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 23 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 35 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. > scala:622) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. > java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 41 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > is not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. > java:373) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invokeInternal(FlinkKafkaProducer010.java:302) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invoke(FlinkKafkaProducer010.java:407) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 52 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > 2017-12-20 05:42:16,008 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING > to FAILING. > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. > java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. > java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. > java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. > java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. > scala:58) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. > scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:36) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. > java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. > java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. > java:504) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. > java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. > java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. > java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:286) > ... 7 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:28) > at DataStreamCalcRule$127.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. > java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 23 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 35 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. > scala:622) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. > java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 41 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > is not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. > java:373) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invokeInternal(FlinkKafkaProducer010.java:302) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invoke(FlinkKafkaProducer010.java:407) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 52 more > > > Thanks > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- 28shivamsha...@gmail.com > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>*