[ 
https://issues.apache.org/jira/browse/FLINK-11942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11942:
-----------------------------------
    Priority: Minor  (was: Major)

> Flink kinesis connector throws kinesis producer daemon fatalError
> -----------------------------------------------------------------
>
>                 Key: FLINK-11942
>                 URL: https://issues.apache.org/jira/browse/FLINK-11942
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.7.2
>            Reporter: indraneel r
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> Flink connector crashes repeatedly with following error:
> {quote}437062 [kpl-callback-pool-28-thread-0] WARN 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - An 
> exception occurred while processing a record
>  java.lang.RuntimeException: Unexpected error
>          at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
>          at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513)
>          at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:183)
>          at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>          at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>          at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>          at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>          at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>          at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>          at 
> com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:37)
>          at 
> com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:33)
>          at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>          at 
> com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:33)
>          at 
> com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:13)
>          at 
> org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
>          at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
>          at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>          at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>          at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>          at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>          at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>          at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>          at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>          at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>          at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>          at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>          at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>          at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.InterruptedException
>          at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
>          at 
> java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
>          at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:339)
>          at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:181)
>          ... 30 more
>  448476 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, 
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Closing 
> producer
>  448477 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, 
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Flushing 
> outstanding 294 records
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to