haibo-duan opened a new issue, #9227: URL: https://github.com/apache/inlong/issues/9227
### What happened When I used Elasticsearch as the data node, the configuration process encountered the following error: [ ] 2023-11-07 08:19:08.132 - INFO [inlong-mq-process-0] a.i.m.s.r.s.e.ElasticsearchApi:166 - create test_pulsar_table_es:true [ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .i.m.s.s.StreamSinkServiceImpl:518 - success to update sink status=130 for id=1 with log: success to create es resource [ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0] .ElasticsearchResourceOperator:111 - success to create es resource for sinkInfo=SinkInfo(id=1, inlongGroupId=test_pulsar_group, inlongStreamId=test_pulsar_stream, sinkType=ELASTICSEARCH, inlongClusterName=null, sinkName=test_pulsar_stream_sink, dataNodeName=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d, description=null, enableCreateResource=1, extParams={"hosts":"http://elasticsearch:9200","username":"admin","password":"******","indexName":"test_pulsar_table_es","flushInterval":null,"flushRecord":1000,"retryTimes":null,"keyFieldNames":null,"documentType":"test_type","primaryKey":"name","esVersion":7,"encryptVersion":1,"properties":{}}, status=100, creator=admin, mqResource=test_pulsar_stream, dataType=CSV, sourceSeparator=124, dataEscapeChar=null) [ ] 2023-11-07 08:19:08.139 -ERROR [inlong-mq-process-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=3, processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream, inlongGroupId=test_pulsar_group, taskId=4, elementName=InitSink, elementDisplayName=Stream-InitSink, eventType=TaskEvent, event=COMPLETE, listener=StreamSinkResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d ) error: java.lang.IllegalArgumentException: find no proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d at org.apache.inlong.manager.common.util.Preconditions.expectNotBlank(Preconditions.java:143) ~[manager-common-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator.assignCluster(AbstractStandaloneSinkResourceOperator.java:58) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.resource.sink.es.ElasticsearchResourceOperator.createSinkResource(ElasticsearchResourceOperator.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener.listen(StreamSinkResourceListener.java:95) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:159) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2023-11-07 08:19:08.154 - INFO [inlong-mq-process-0] .m.s.s.InlongStreamServiceImpl:725 - success to update stream after approve for groupId=test_pulsar_group, streamId=test_pulsar_stream [ ] 2023-11-07 08:19:08.160 -ERROR [inlong-mq-process-0] .m.s.l.q.QueueResourceListener:168 - failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream [ ] 2023-11-07 08:19:08.160 -ERROR [inlong-workflow-0] .m.s.l.q.QueueResourceListener:179 - failed to execute stream process in asynchronously java.util.concurrent.ExecutionException: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_342] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:176) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] Caused by: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:169) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_342] ... 3 more [ ] 2023-11-07 08:19:08.161 -ERROR [inlong-workflow-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=2, processName=CREATE_GROUP_RESOURCE, processDisplayName=Create Group, inlongGroupId=test_pulsar_group, taskId=2, elementName=InitMQ, elementDisplayName=Group-InitMQ, eventType=TaskEvent, event=COMPLETE, listener=QueueResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream) error: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_pulsar_group streamId=test_pulsar_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:180) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79) ~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2023-11-07 08:19:08.169 - INFO [inlong-workflow-0] .s.l.g.InitGroupFailedListener:62 - begin to execute InitGroupFailedListener for groupId=test_pulsar_group [ ] 2023-11-07 08:19:08.170 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:446 - begin to update group status to [120] for groupId=test_pulsar_group by user=admin    ### What you expected to happen Use ES sink ### How to reproduce Use ES sink ### Environment Ubuntu 22.04.3 LTS ### InLong version master ### InLong Component InLong Manager ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org