[ 
https://issues.apache.org/jira/browse/FLINK-20641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Echo Lee closed FLINK-20641.
----------------------------
    Resolution: Not A Bug

> flink-connector-elasticsearch6 will deadlock
> --------------------------------------------
>
>                 Key: FLINK-20641
>                 URL: https://issues.apache.org/jira/browse/FLINK-20641
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.11.1
>            Reporter: Echo Lee
>            Priority: Minor
>              Labels: auto-deprioritized-major
>             Fix For: 1.15.0, 1.14.3
>
>         Attachments: jstack
>
>
> flink version: 1.11.1
> elasticsearch connector version: 6.3.1
> My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger 
> degree of parallelism, stream processing will stop,  I know es has an issue 
> [47599|https://github.com/elastic/elasticsearch/issues/47599], this is 
> unexpectedly the risk of deadlock when using flink-connector-elasticsearch6.
>  
> TaskManager stack is:
> [link title|http://example.com/][^jstack]
>  
> TaskManager log is:
> {code:java}
> 2020-12-16 14:36:35,291 ERROR xxx.ActionRequestFailureHandler      [] - Sink 
> to es exception ,exceptionData: index {[full_link_apm_span-2020-      
> 12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} 
> ,exceptionStackTrace: java.lang.InterruptedException
> 68224     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
> 68225     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> 68226     at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 68227     at 
> org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
> 68228     at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
> 68229     at 
> org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
> 68230     at 
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
> 68231     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
> 68232     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
> 68233     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
> 68234     at 
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
> 68235     at 
> com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59)
> 68236     at 
> com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47)
> 68237     at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310)
> 68238     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 68239     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> 68240     at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> 68241     at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> 68242     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> 68243     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> 68244     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> 68245     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> 68246     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> 68247     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> 68248     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 68249     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to