chenxu8989 opened a new issue, #11969: URL: https://github.com/apache/inlong/issues/11969
### What happened # 安装方式 * Docker Compose # 安装参考文档 https://inlong.apache.org/zh-CN/docs/deployment/docker # 执行的操作 [Kafka 示例](https://inlong.apache.org/zh-CN/docs/quick_start/data_ingestion/mysql_kafka_clickhouse_example) # 异常信息 <img width="1040" height="621" alt="Image" src="https://github.com/user-attachments/assets/b265ed4b-2986-4f49-8375-657e7f20cadc" /> ```log [ ] 2025-08-15 10:03:13.977 - INFO [inlong-mq-process-0] o.a.i.m.s.c.i.AuditServiceImpl:310 - success to query audit proxy url list for request url =http://audit:10080/audit/query/getAuditProxy?component=Sort [ ] 2025-08-15 10:03:14.254 - INFO [inlong-mq-process-0] s.l.s.StreamSortConfigListener:133 - success to build sort config for groupId=test_kafka_group, streamId=test_kafka_stream [ ] 2025-08-15 10:03:14.292 - INFO [inlong-mq-process-0] .i.m.p.l.StartupStreamListener:74 - inlong stream :test_kafka_stream ext info: [InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, keyName=dataflow, keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics.audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumb er":true},"ignoreParseErrors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey" :"id"}],"relations":[{"type":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})] [ ] 2025-08-15 10:03:14.349 - INFO [inlong-mq-process-0] o.a.i.m.p.u.FlinkUtils :277 - stream ext info: [InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, keyName=dataflow, keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics.audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumber":true},"ignoreParseErr ors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey":"id"}],"relations":[{"ty pe":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})] [ ] 2025-08-15 10:03:14.405 - INFO [inlong-mq-process-0] o.a.i.m.p.f.FlinkOperation :208 - gen path from /opt/inlong-manager/lib/manager-plugins-base-2.2.0.jar [ ] 2025-08-15 10:03:14.406 - INFO [inlong-mq-process-0] o.a.i.m.p.f.FlinkOperation :227 - get sort jar path success, path: /opt/inlong-sort/sort-dist-2.2.0.jar [ ] 2025-08-15 10:03:14.413 - INFO [inlong-mq-process-0] o.a.i.m.p.f.FlinkOperation :254 - get sort connector paths success, paths: [/opt/inlong-sort/connectors/sort-connector-jdbc-v1.15-2.2.0.jar, /opt/inlong-sort/connectors/sort-connector-kafka-v1.15-2.2.0.jar] [ ] 2025-08-15 10:03:14.419 - INFO [inlong-mq-process-0] o.a.i.m.p.u.FlinkUtils :207 - Start to load Flink config from file: /opt/inlong-manager/plugins/flink-sort-plugin.properties [ ] 2025-08-15 10:03:14.481 -ERROR [inlong-plugin-0] .p.f.FlinkParallelismOptimizer:86 - Error retrieving data volume: null java.lang.NullPointerException: null at org.apache.inlong.manager.plugin.flink.FlinkParallelismOptimizer.getAverageDataVolume(FlinkParallelismOptimizer.java:125) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.FlinkParallelismOptimizer.calculateRecommendedParallelism(FlinkParallelismOptimizer.java:83) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:226) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:179) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) ~[manager-plugins-base-2.2.0.jar:2.2.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2025-08-15 10:03:14.482 - INFO [inlong-plugin-0] .p.f.FlinkParallelismOptimizer:92 - Calculated parallelism: 0 for data volume: 0 [ ] 2025-08-15 10:03:14.482 - INFO [inlong-plugin-0] o.a.i.m.p.f.FlinkService :235 - current parallelism: 1 [ ] 2025-08-15 10:03:14.839 - INFO [inlong-operation-log-0] o.a.i.m.s.o.OperationLogPool :92 - receive 1 logs and saved cost 17 ms [ ] 2025-08-15 10:03:18.615 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :143 - start parse group, groupId:test_kafka_group [ ] 2025-08-15 10:03:18.616 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :166 - start parse stream, streamId:test_kafka_stream [ ] 2025-08-15 10:03:18.618 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :229 - start parse node relation, relation:NodeRelation(inputs=[test_kafka_stream], outputs=[test_kafka_sink]) [ ] 2025-08-15 10:03:18.619 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :292 - start parse node, node id:test_kafka_stream [ ] 2025-08-15 10:03:18.654 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :294 - node id:test_kafka_stream, create table sql: CREATE TABLE `table_test_kafka_stream`( `id` INT, `name` STRING) WITH ( 'metrics.audit.key' = '9', 'metrics.audit.proxy.hosts' = 'audit:10081', 'inlong.metric.labels' = 'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream', 'topic' = 'test_kafka_group.test_kafka_stream', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka-inlong', 'inlong-msg.debezium-json.schema-include' = 'false', 'inlong-msg.debezium-json.encode.decimal-as-plain-number' = 'true', 'inlong-msg.inner.format' = 'debezium-json', 'format' = 'inlong-msg', 'inlong-msg.debezium-json.timestamp-format.standard' = 'SQL', 'inlong-msg.debezium-json.ignore-parse-errors' = 'true', 'inlong-msg.debezium-json.map-null-key.literal' = 'null', 'inlong-msg.debezium-json.map-null-key.mode' = 'DROP', 'inlong-msg.ignore-parse-errors' = 'false', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'default_cluster_test_kafka_group.test_kafka_stream_consumer_group' ) [ ] 2025-08-15 10:03:18.660 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :318 - parse node success, node id:test_kafka_stream [ ] 2025-08-15 10:03:18.665 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :301 - node id:test_kafka_sink, create table sql: CREATE TABLE `test_ck_db.test_ck_table`( PRIMARY KEY (`id`) NOT ENFORCED, `id` BIGINT, `name` STRING) WITH ( 'metrics.audit.key' = '10', 'metrics.audit.proxy.hosts' = 'audit:10081', 'inlong.metric.labels' = 'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_sink', 'connector' = 'jdbc-inlong', 'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect', 'url' = 'jdbc:clickhouse://clickhouse:8123/test_ck_db', 'table-name' = 'test_ck_db.test_ck_table', 'username' = 'admin', 'password' = '******' ) [ ] 2025-08-15 10:03:18.667 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :318 - parse node success, node id:test_kafka_sink [ ] 2025-08-15 10:03:18.668 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :246 - parse node relation success, relation:NodeRelation(inputs=[test_kafka_stream], outputs=[test_kafka_sink]) [ ] 2025-08-15 10:03:18.669 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :183 - parse stream success, streamId:test_kafka_stream [ ] 2025-08-15 10:03:18.669 - INFO [inlong-plugin-0] o.a.i.s.p.i.FlinkSqlParser :147 - parse group success, groupId:test_kafka_group [ ] 2025-08-15 10:03:18.680 - INFO [inlong-plugin-0] .a.i.s.p.r.FlinkSqlParseResult:89 - execute createSql: CREATE TABLE `table_test_kafka_stream`( `id` INT, `name` STRING) WITH ( 'metrics.audit.key' = '9', 'metrics.audit.proxy.hosts' = 'audit:10081', 'inlong.metric.labels' = 'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream', 'topic' = 'test_kafka_group.test_kafka_stream', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka-inlong', 'inlong-msg.debezium-json.schema-include' = 'false', 'inlong-msg.debezium-json.encode.decimal-as-plain-number' = 'true', 'inlong-msg.inner.format' = 'debezium-json', 'format' = 'inlong-msg', 'inlong-msg.debezium-json.timestamp-format.standard' = 'SQL', 'inlong-msg.debezium-json.ignore-parse-errors' = 'true', 'inlong-msg.debezium-json.map-null-key.literal' = 'null', 'inlong-msg.debezium-json.map-null-key.mode' = 'DROP', 'inlong-msg.ignore-parse-errors' = 'false', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'default_cluster_test_kafka_group.test_kafka_stream_consumer_group' ) [ ] 2025-08-15 10:03:19.035 - INFO [inlong-plugin-0] .a.i.s.p.r.FlinkSqlParseResult:89 - execute createSql: CREATE TABLE `test_ck_db.test_ck_table`( PRIMARY KEY (`id`) NOT ENFORCED, `id` BIGINT, `name` STRING) WITH ( 'metrics.audit.key' = '10', 'metrics.audit.proxy.hosts' = 'audit:10081', 'inlong.metric.labels' = 'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_sink', 'connector' = 'jdbc-inlong', 'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect', 'url' = 'jdbc:clickhouse://clickhouse:8123/test_ck_db', 'table-name' = 'test_ck_db.test_ck_table', 'username' = 'admin', 'password' = '******' ) [ ] 2025-08-15 10:03:19.053 - INFO [inlong-plugin-0] .a.i.s.p.r.FlinkSqlParseResult:81 - execute loadSql: INSERT INTO `test_ck_db.test_ck_table` SELECT CAST(`id` as BIGINT) AS `id`, `name` AS `name` FROM `table_test_kafka_stream` [ ] 2025-08-15 10:03:19.424 - INFO [inlong-plugin-0] s.k.t.KafkaDynamicTableFactory:170 - valueFormatPrefix is inlong-msg [ ] 2025-08-15 10:03:19.470 -ERROR [inlong-plugin-0] o.a.i.m.p.f.FlinkService :181 - submit job from info FlinkInfo(endpoint=null, jobName=InLong-Sort-test_kafka_group-test_kafka_stream, inlongStreamInfoList=[InlongStreamInfo(id=5, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, name=null, description=, mqResource=test_kafka_stream, dataType=CSV, dataEncoding=UTF-8, dataSeparator=124, dataEscapeChar=null, syncSend=0, dailyRecords=10, dailyStorage=10, peakRecords=1000, maxLength=10240, storagePeriod=1, extParams={"ignoreParseError":true,"kvSeparator":null,"lineSeparator":null,"useExtendedFields":false,"predefinedFields":"","sinkMultipleEnable":false,"extendedFieldSize":0}, status=110, previousStatus=100, creator=dev, modifier=admin, createTime=Fri Aug 15 01:55:56 UTC 2025, modifyTime=Fri Aug 15 02:03:12 UTC 2025, fieldList=[StreamField(id=9, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, fieldName=id, fieldType=int, fieldComment=id, isPrede finedField=0, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null), StreamField(id=10, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, fieldName=name, fieldType=string, fieldComment=name, isPredefinedField=0, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null)], extList=[InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, keyName=dataflow, keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics. audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumber":true},"ignoreParseErrors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name ":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey":"id"}],"relations":[{"type":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})], sourceList=[MySQLBinlogSource(super=StreamSource(id=5, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, sourceType=MYSQL_BINLOG, sourceName=test_kafka_source, agentIp=null, uuid=null, inlongClusterName=null, inlongClusterNodeTag=null, dataNodeName=null, serializationType=debezium_json, snapshot=null, dataTimeZone=null, version=1, status=110, previousStatus=null, creator=dev, modifier=dev, createTime=Fri Aug 15 01:59:11 UTC 2025, modifyTime=Fri Aug 15 01:59:11 UTC 2025, properties={}, taskMapId=null, au ditVersion=null, dataAddTaskList=null, ignoreParseError=null), user=test_mysql_db, password=****** hostname=mysql, port=3306, serverId=0, includeSchema=null, databaseWhiteList=test_mysql_db, tableWhiteList=test_mysql_db.test_mysql_table, serverTimezone=UTC, intervalMs=1000, snapshotMode=initial, offsetFilename=null, historyFilename=/data/inlong-agent/.history, monitoredDdl=null, timestampFormatStandard=SQL, allMigration=false, onlyIncremental=true, primaryKey=null, specificOffsetFile=null, specificOffsetPos=null)], sinkList=[ClickHouseSink(super=StreamSink(super=StreamNode(preNodes=null, postNodes=null, fieldList=[StreamField(id=9, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, fieldName=id, fieldType=int, fieldComment=id, isPredefinedField=0, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null), StreamField(id=10, inlongGroupId=test_kafka_group, inlongStreamId=test_kaf ka_stream, fieldName=name, fieldType=string, fieldComment=name, isPredefinedField=0, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null)]), id=5, inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, sinkType=CLICKHOUSE, sinkName=test_kafka_sink, transformSql=null, description=, inlongClusterName=null, dataNodeName=5924d85a-57bc-4d0c-a86a-dbe6350cf313, sortTaskName=null, sortConsumerGroup=null, enableCreateResource=1, enableDataArchiving=null, operateLog=success to create clickhouse resource, status=110, previousStatus=100, creator=dev, modifier=admin, createTime=Fri Aug 15 01:55:56 UTC 2025, modifyTime=Fri Aug 15 02:03:12 UTC 2025, sinkFieldList=[SinkField(id=9, sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=id, fieldType=Int64, fieldComment=id, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, sourceFieldName= id, sourceFieldType=int, extParams={"sinkType":"CLICKHOUSE","id":null,"sinkType":"CLICKHOUSE","inlongGroupId":null,"inlongStreamId":null,"fieldName":"id","fieldType":"Int64","fieldComment":null,"isMetaField":0,"metaFieldName":null,"fieldFormat":null,"originNodeName":null,"originFieldName":null,"sourceFieldName":"id","sourceFieldType":"int","extParams":null,"defaultType":null,"defaultExpr":null,"compressionCode":null,"ttlExpr":null}), SinkField(id=10, sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=name, fieldType=String, fieldComment=name, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, sourceFieldName=name, sourceFieldType=string, extParams={"sinkType":"CLICKHOUSE","id":null,"sinkType":"CLICKHOUSE","inlongGroupId":null,"inlongStreamId":null,"fieldName":"name","fieldType":"String","fieldComment":null,"isMetaField":0,"metaFieldName":null,"fieldFormat":null,"originNodeName":null,"originFieldName":null,"sourceFieldName": "name","sourceFieldType":"string","extParams":null,"defaultType":null,"defaultExpr":null,"compressionCode":null,"ttlExpr":null})], properties={metrics.audit.key=10, metrics.audit.proxy.hosts=audit:10081}, dataEncoding=UTF-8, dataFormat=NONE, authentication=null, version=1), jdbcUrl=jdbc:clickhouse://clickhouse:8123, username=admin, password=****** dbName=test_ck_db, tableName=test_ck_table, flushInterval=1, flushRecord=1000, retryTimes=3, isDistributed=0, partitionStrategy=null, partitionFields=null, keyFieldNames=null, engine=MergeTree, partitionBy=null, orderBy=null, ttl=null, ttlUnit=null, cluster=null, primaryKey=id)], version=1, wrapType=INLONG_MSG_V0, useExtendedFields=false, extendedFieldSize=0, sinkMultipleEnable=false, syncField=false, ignoreParseError=true)], localJarPath=/opt/inlong-sort/sort-dist-2.2.0.jar, connectorJarPaths=[/opt/inlong-sort/connectors/sort-connector-jdbc-v1.15-2.2.0.jar, /opt/inlong-sort/connectors/sort-connector-kafka-v1.15-2.2.0.jar], localConfPath=/ opt/inlong-manager/lib/InLong-Sort-test_kafka_group-test_kafka_stream, sourceType=null, sinkType=null, jobId=null, savepointPath=null, isException=false, exceptionMsg=null, runtimeExecutionMode=stream, boundaryType=null, lowerBoundary=null, upperBoundary=null) failed: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.table_test_kafka_stream'. Table options are: 'connector'='kafka-inlong' 'format'='inlong-msg' 'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true' 'inlong-msg.debezium-json.ignore-parse-errors'='true' 'inlong-msg.debezium-json.map-null-key.literal'='null' 'inlong-msg.debezium-json.map-null-key.mode'='DROP' 'inlong-msg.debezium-json.schema-include'='false' 'inlong-msg.debezium-json.timestamp-format.standard'='SQL' 'inlong-msg.ignore-parse-errors'='false' 'inlong-msg.inner.format'='debezium-json' 'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream' 'metrics.audit.key'='9' 'metrics.audit.proxy.hosts'='audit:10081' 'properties.bootstrap.servers'='kafka:9092' 'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group' 'scan.startup.mode'='earliest-offset' 'topic'='test_kafka_group.test_kafka_stream' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-clients-1.15.4.jar:1.15.4] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients-1.15.4.jar:1.15.4] at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-clients-1.15.4.jar:1.15.4] at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) ~[flink-clients-1.15.4.jar:1.15.4] at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117) ~[flink-clients-1.15.4.jar:1.15.4] at org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:244) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:179) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) ~[manager-plugins-base-2.2.0.jar:2.2.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_test_kafka_stream'. Table options are: 'connector'='kafka-inlong' 'format'='inlong-msg' 'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true' 'inlong-msg.debezium-json.ignore-parse-errors'='true' 'inlong-msg.debezium-json.map-null-key.literal'='null' 'inlong-msg.debezium-json.map-null-key.mode'='DROP' 'inlong-msg.debezium-json.schema-include'='false' 'inlong-msg.debezium-json.timestamp-format.standard'='SQL' 'inlong-msg.ignore-parse-errors'='false' 'inlong-msg.inner.format'='debezium-json' 'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream' 'metrics.audit.key'='9' 'metrics.audit.proxy.hosts'='audit:10081' 'properties.bootstrap.servers'='kafka:9092' 'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group' 'scan.startup.mode'='earliest-offset' 'topic'='test_kafka_group.test_kafka_stream' at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159) ~[flink-table-common-1.15.4.jar:1.15.4] at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-common-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62) ~[flink-table-api-java-1.15.4.jar:1.15.4] at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:45) ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4] at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:30) ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:82) ~[?:?] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63) ~[?:?] at org.apache.inlong.sort.Entrance.main(Entrance.java:101) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients-1.15.4.jar:1.15.4] ... 12 more Caused by: java.lang.NullPointerException at org.apache.inlong.sort.kafka.shaded.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903) ~[?:?] at org.apache.inlong.sort.kafka.shaded.com.google.common.base.Splitter.split(Splitter.java:387) ~[?:?] at org.apache.inlong.sort.kafka.shaded.com.google.common.base.Splitter$MapSplitter.split(Splitter.java:518) ~[?:?] at org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.util.AuditUtils.extractChangelogAuditKeyMap(AuditUtils.java:59) ~[?:?] at org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base.metric.MetricOption$Builder.build(MetricOption.java:262) ~[?:?] at org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:209) ~[?:?] at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156) ~[flink-table-common-1.15.4.jar:1.15.4] at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-common-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[flink-table-planner_2.12-1.15.4.jar:1.15.4] at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62) ~[flink-table-api-java-1.15.4.jar:1.15.4] at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:45) ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4] at org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:30) ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:82) ~[?:?] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63) ~[?:?] at org.apache.inlong.sort.Entrance.main(Entrance.java:101) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients-1.15.4.jar:1.15.4] ... 12 more [ ] 2025-08-15 10:03:20.198 - WARN [inlong-plugin-0] .i.m.p.f.IntegrationTaskRunner:64 - Start job null failed in backend exception[java.lang.Exception: submit job failed: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.table_test_kafka_stream'. Table options are: 'connector'='kafka-inlong' 'format'='inlong-msg' 'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true' 'inlong-msg.debezium-json.ignore-parse-errors'='true' 'inlong-msg.debezium-json.map-null-key.literal'='null' 'inlong-msg.debezium-json.map-null-key.mode'='DROP' 'inlong-msg.debezium-json.schema-include'='false' 'inlong-msg.debezium-json.timestamp-format.standard'='SQL' 'inlong-msg.ignore-parse-errors'='false' 'inlong-msg.inner.format'='debezium-json' 'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream' 'metrics.audit.key'='9' 'metrics.audit.proxy.hosts'='audit:10081' 'properties.bootstrap.servers'='kafka:9092' 'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group' 'scan.startup.mode'='earliest-offset' 'topic'='test_kafka_group.test_kafka_stream' at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182) at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ] [ ] 2025-08-15 10:03:20.216 - INFO [inlong-mq-process-0] o.a.i.m.p.u.FlinkUtils :316 - job submit success for groupId = test_kafka_group, streamId = test_kafka_stream, jobId = null [ ] 2025-08-15 10:03:20.217 -ERROR [inlong-mq-process-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=19, processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream, inlongGroupId=test_kafka_group, taskId=37, elementName=InitSort, elementDisplayName=Stream-InitSort, eventType=TaskEvent, event=COMPLETE, listener=StartupStreamListener, startTime=Fri Aug 15 10:03:14 UTC 2025, endTime=null, status=-1, async=0, ip=172.21.0.8, remark=null, exception=startup failed: Start job null failed in backend exception[java.lang.Exception: submit job failed: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.table_test_kafka_stream'. Table options are: 'connector'='kafka-inlong' 'format'='inlong-msg' 'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true' 'inlong-msg.debezium-json.ignore-parse-errors'='true' 'inlong-msg.debezium-json.map-null-key.literal'='null' 'inlong-msg.debezium-json.map-null-key.mode'='DROP' 'inlong-msg.debezium-json.schema-include'='false' 'inlong-msg.debezium-json.timestamp-format.standard'='SQL' 'inlong-msg.ignore-parse-errors'='false' 'inlong-msg.inner.format'='debezium-json' 'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream' 'metrics.audit.key'='9' 'metrics.audit.proxy.hosts'='audit:10081' 'properties.bootstrap.servers'='kafka:9092' 'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group' 'scan.startup.mode'='earliest-offset' 'topic'='test_kafka_group.test_kafka_stream' at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182) at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ]) error: org.apache.inlong.manager.common.exceptions.BusinessException: startup failed: Start job null failed in backend exception[java.lang.Exception: submit job failed: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.table_test_kafka_stream'. Table options are: 'connector'='kafka-inlong' 'format'='inlong-msg' 'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true' 'inlong-msg.debezium-json.ignore-parse-errors'='true' 'inlong-msg.debezium-json.map-null-key.literal'='null' 'inlong-msg.debezium-json.map-null-key.mode'='DROP' 'inlong-msg.debezium-json.schema-include'='false' 'inlong-msg.debezium-json.timestamp-format.standard'='SQL' 'inlong-msg.ignore-parse-errors'='false' 'inlong-msg.inner.format'='debezium-json' 'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream' 'metrics.audit.key'='9' 'metrics.audit.proxy.hosts'='audit:10081' 'properties.bootstrap.servers'='kafka:9092' 'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group' 'scan.startup.mode'='earliest-offset' 'topic'='test_kafka_group.test_kafka_stream' at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182) at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ] at org.apache.inlong.manager.plugin.flink.FlinkOperation.pollJobStatus(FlinkOperation.java:308) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJob(FlinkUtils.java:330) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJob(FlinkUtils.java:262) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.plugin.listener.StartupStreamListener.listen(StartupStreamListener.java:77) ~[manager-plugins-base-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:165) ~[manager-service-2.2.0.jar:2.2.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] [ ] 2025-08-15 10:03:20.264 - INFO [inlong-mq-process-0] .m.s.s.InlongStreamServiceImpl:669 - success to update stream after approve for groupId=test_kafka_group, streamId=test_kafka_stream [ ] 2025-08-15 10:03:20.270 -ERROR [inlong-mq-process-0] .m.s.l.q.QueueResourceListener:174 - failed to start stream process for groupId=test_kafka_group streamId=test_kafka_stream [ ] 2025-08-15 10:03:20.272 -ERROR [inlong-workflow-0] .m.s.l.q.QueueResourceListener:185 - failed to execute stream process in asynchronously java.util.concurrent.ExecutionException: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_kafka_group streamId=test_kafka_stream at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_342] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:182) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:140) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:81) ~[manager-service-2.2.0.jar:2.2.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] Caused by: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_kafka_group streamId=test_kafka_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:175) ~[manager-service-2.2.0.jar:2.2.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_342] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_342] ... 3 more [ ] 2025-08-15 10:03:20.274 -ERROR [inlong-workflow-0] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=18, processName=CREATE_GROUP_RESOURCE, processDisplayName=Create Group, inlongGroupId=test_kafka_group, taskId=34, elementName=InitMQ, elementDisplayName=Group-InitMQ, eventType=TaskEvent, event=COMPLETE, listener=QueueResourceListener, startTime=Fri Aug 15 10:03:12 UTC 2025, endTime=null, status=-1, async=0, ip=172.21.0.8, remark=null, exception=failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_kafka_group streamId=test_kafka_stream) error: org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to execute stream process in asynchronously : org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed to start stream process for groupId=test_kafka_group streamId=test_kafka_stream at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:186) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:140) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75) ~[manager-workflow-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66) ~[manager-service-2.2.0.jar:2.2.0] at org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:81) ~[manager-service-2.2.0.jar:2.2.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] ``` ### What you expected to happen 正常配置 ### How to reproduce 使用 docker compose 安装并按照Kafka 示例操作 ### Environment docker 环境 ### InLong version master ### InLong Component InLong Manager ### Are you willing to submit PR? - [ ] Yes, I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org