[ https://issues.apache.org/jira/browse/FLINK-11942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-11942: ----------------------------------- Priority: Minor (was: Major) > Flink kinesis connector throws kinesis producer daemon fatalError > ----------------------------------------------------------------- > > Key: FLINK-11942 > URL: https://issues.apache.org/jira/browse/FLINK-11942 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.7.2 > Reporter: indraneel r > Priority: Minor > Labels: auto-deprioritized-major > > Flink connector crashes repeatedly with following error: > {quote}437062 [kpl-callback-pool-28-thread-0] WARN > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - An > exception occurred while processing a record > java.lang.RuntimeException: Unexpected error > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:183) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:37) > at > com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:33) > at scala.collection.immutable.Stream.foreach(Stream.scala:594) > at > com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:33) > at > com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:13) > at > org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) > at > java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:339) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:181) > ... 30 more > 448476 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Closing > producer > 448477 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Flushing > outstanding 294 records > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)