[ 
https://issues.apache.org/jira/browse/FLINK-20641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Echo Lee updated FLINK-20641:
-----------------------------
    Description: 
flink vision: 1.11.1

elasticsearch connector version: 6.3.1

My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger 
degree of parallelism, stream processing will stop,  I know es has an issue 
[47599|[https://github.com/elastic/elasticsearch/issues/47599],] this is 
unexpectedly the risk of deadlock when using flink-connector-elasticsearch6.

 

TaskManager stack is:
{code:java}
"elasticsearch[scheduler][T#1]" Id=15008 BLOCKED on 
org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: 
ProtoTraceLog (39/60)" Id=8781"elasticsearch[scheduler][T#1]" Id=15008 BLOCKED 
on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: 
ProtoTraceLog (39/60)" Id=8781 at 
org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:366) - 
 blocked on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 at 
org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
 at 
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
...
 Number of locked synchronizers = 1 - 
java.util.concurrent.ThreadPoolExecutor$Worker@15659a74"

Sink: ProtoTraceLog (39/60)" Id=8781 WAITING on 
java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
sun.misc.Unsafe.park(Native Method) -  waiting on 
java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
 at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) 
...
{code}
TaskManager log is:
{code:java}
2020-12-16 14:36:35,291 ERROR xxx.ActionRequestFailureHandler      [] - Sink to 
es exception ,exceptionData: index {[full_link_apm_span-2020-      
12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} 
,exceptionStackTrace: java.lang.InterruptedException
68224     at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
68225     at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
68226     at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
68227     at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
68228     at 
org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
68229     at 
org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
68230     at 
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
68231     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
68232     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
68233     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
68234     at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
68235     at 
com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59)
68236     at 
com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47)
68237     at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310)
68238     at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
68239     at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
68240     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
68241     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
68242     at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
68243     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
68244     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
68245     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
68246     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
68247     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
68248     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
68249     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
{code}

  was:
flink vision: 1.11.1

elasticsearch connector version: 6.3.1

My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger 
degree of parallelism, stream processing will stop,  I know es has an issue 
[47599|[https://github.com/elastic/elasticsearch/issues/47599],] this is 
unexpectedly the risk of deadlock when using flink-connector-elasticsearch6.

 

TaskManager stack is:
{code:java}
"elasticsearch[scheduler][T#1]" Id=15008 BLOCKED on 
org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: 
ProtoTraceLog (39/60)" Id=8781"elasticsearch[scheduler][T#1]" Id=15008 BLOCKED 
on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: 
ProtoTraceLog (39/60)" Id=8781 at 
org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:366) - 
 blocked on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 at 
org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
 at 
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
...
 Number of locked synchronizers = 1 - 
java.util.concurrent.ThreadPoolExecutor$Worker@15659a74"

Sink: ProtoTraceLog (39/60)" Id=8781 WAITING on 
java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
sun.misc.Unsafe.park(Native Method) -  waiting on 
java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
 at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) 
...
{code}
TaskManager log is:
{code:java}
2020-12-16 14:36:35,291 ERROR 
com.hundsun.flink.handler.HsActionRequestFailureHandler      [] - Sink to es 
exception ,exceptionData: index {[full_link_apm_span-2020-      
12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} 
,exceptionStackTrace: java.lang.InterruptedException
68224     at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
68225     at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
68226     at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
68227     at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
68228     at 
org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
68229     at 
org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
68230     at 
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
68231     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
68232     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
68233     at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
68234     at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
68235     at 
com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59)
68236     at 
com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47)
68237     at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310)
68238     at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
68239     at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
68240     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
68241     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
68242     at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
68243     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
68244     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
68245     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
68246     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
68247     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
68248     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
68249     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
{code}


> flink-connector-elasticsearch6 will deadlock
> --------------------------------------------
>
>                 Key: FLINK-20641
>                 URL: https://issues.apache.org/jira/browse/FLINK-20641
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.11.1
>            Reporter: Echo Lee
>            Priority: Major
>             Fix For: 1.13.0
>
>
> flink vision: 1.11.1
> elasticsearch connector version: 6.3.1
> My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger 
> degree of parallelism, stream processing will stop,  I know es has an issue 
> [47599|[https://github.com/elastic/elasticsearch/issues/47599],] this is 
> unexpectedly the risk of deadlock when using flink-connector-elasticsearch6.
>  
> TaskManager stack is:
> {code:java}
> "elasticsearch[scheduler][T#1]" Id=15008 BLOCKED on 
> org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: 
> ProtoTraceLog (39/60)" Id=8781"elasticsearch[scheduler][T#1]" Id=15008 
> BLOCKED on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by 
> "Sink: ProtoTraceLog (39/60)" Id=8781 at 
> org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:366) 
> -  blocked on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 at 
> org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
>  at 
> org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ...
>  Number of locked synchronizers = 1 - 
> java.util.concurrent.ThreadPoolExecutor$Worker@15659a74"
> Sink: ProtoTraceLog (39/60)" Id=8781 WAITING on 
> java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
> sun.misc.Unsafe.park(Native Method) -  waiting on 
> java.util.concurrent.CountDownLatch$Sync@58bbbd7c at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
>  at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) 
> ...
> {code}
> TaskManager log is:
> {code:java}
> 2020-12-16 14:36:35,291 ERROR xxx.ActionRequestFailureHandler      [] - Sink 
> to es exception ,exceptionData: index {[full_link_apm_span-2020-      
> 12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} 
> ,exceptionStackTrace: java.lang.InterruptedException
> 68224     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
> 68225     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> 68226     at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> 68227     at 
> org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
> 68228     at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
> 68229     at 
> org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
> 68230     at 
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
> 68231     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
> 68232     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
> 68233     at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
> 68234     at 
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
> 68235     at 
> com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59)
> 68236     at 
> com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47)
> 68237     at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310)
> 68238     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 68239     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> 68240     at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> 68241     at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> 68242     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> 68243     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> 68244     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> 68245     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> 68246     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> 68247     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> 68248     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 68249     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to