chenxu8989 opened a new issue, #11969:
URL: https://github.com/apache/inlong/issues/11969

   ### What happened
   
   # 安装方式
   * Docker Compose
   
   # 安装参考文档
   https://inlong.apache.org/zh-CN/docs/deployment/docker
   
   # 执行的操作
   [Kafka 
示例](https://inlong.apache.org/zh-CN/docs/quick_start/data_ingestion/mysql_kafka_clickhouse_example)
   
   # 异常信息
   
   <img width="1040" height="621" alt="Image" 
src="https://github.com/user-attachments/assets/b265ed4b-2986-4f49-8375-657e7f20cadc";
 />
   
   ```log
   
   [ ] 2025-08-15 10:03:13.977 - INFO [inlong-mq-process-0] 
o.a.i.m.s.c.i.AuditServiceImpl:310 - success to query audit proxy url list for 
request url =http://audit:10080/audit/query/getAuditProxy?component=Sort 
   [ ] 2025-08-15 10:03:14.254 - INFO [inlong-mq-process-0] 
s.l.s.StreamSortConfigListener:133 - success to build sort config for 
groupId=test_kafka_group, streamId=test_kafka_stream 
   [ ] 2025-08-15 10:03:14.292 - INFO [inlong-mq-process-0] 
.i.m.p.l.StartupStreamListener:74 - inlong stream :test_kafka_stream ext info: 
[InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, keyName=dataflow, 
keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics.audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumb
 
er":true},"ignoreParseErrors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey"
 
:"id"}],"relations":[{"type":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})]
 
   [ ] 2025-08-15 10:03:14.349 - INFO [inlong-mq-process-0] 
o.a.i.m.p.u.FlinkUtils        :277 - stream ext info: 
[InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, keyName=dataflow, 
keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics.audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumber":true},"ignoreParseErr
 
ors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey":"id"}],"relations":[{"ty
 
pe":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})]
 
   [ ] 2025-08-15 10:03:14.405 - INFO [inlong-mq-process-0] 
o.a.i.m.p.f.FlinkOperation    :208 - gen path from 
/opt/inlong-manager/lib/manager-plugins-base-2.2.0.jar 
   [ ] 2025-08-15 10:03:14.406 - INFO [inlong-mq-process-0] 
o.a.i.m.p.f.FlinkOperation    :227 - get sort jar path success, path: 
/opt/inlong-sort/sort-dist-2.2.0.jar 
   [ ] 2025-08-15 10:03:14.413 - INFO [inlong-mq-process-0] 
o.a.i.m.p.f.FlinkOperation    :254 - get sort connector paths success, paths: 
[/opt/inlong-sort/connectors/sort-connector-jdbc-v1.15-2.2.0.jar, 
/opt/inlong-sort/connectors/sort-connector-kafka-v1.15-2.2.0.jar] 
   [ ] 2025-08-15 10:03:14.419 - INFO [inlong-mq-process-0] 
o.a.i.m.p.u.FlinkUtils        :207 - Start to load Flink config from file: 
/opt/inlong-manager/plugins/flink-sort-plugin.properties 
   [ ] 2025-08-15 10:03:14.481 -ERROR [inlong-plugin-0] 
.p.f.FlinkParallelismOptimizer:86 - Error retrieving data volume: null 
   java.lang.NullPointerException: null
        at 
org.apache.inlong.manager.plugin.flink.FlinkParallelismOptimizer.getAverageDataVolume(FlinkParallelismOptimizer.java:125)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.FlinkParallelismOptimizer.calculateRecommendedParallelism(FlinkParallelismOptimizer.java:83)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:226)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:179)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   [ ] 2025-08-15 10:03:14.482 - INFO [inlong-plugin-0] 
.p.f.FlinkParallelismOptimizer:92 - Calculated parallelism: 0 for data volume: 
0 
   [ ] 2025-08-15 10:03:14.482 - INFO [inlong-plugin-0] 
o.a.i.m.p.f.FlinkService      :235 - current parallelism: 1 
   [ ] 2025-08-15 10:03:14.839 - INFO [inlong-operation-log-0] 
o.a.i.m.s.o.OperationLogPool  :92 - receive 1 logs and saved cost 17 ms 
   [ ] 2025-08-15 10:03:18.615 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :143 - start parse group, 
groupId:test_kafka_group 
   [ ] 2025-08-15 10:03:18.616 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :166 - start parse stream, 
streamId:test_kafka_stream 
   [ ] 2025-08-15 10:03:18.618 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :229 - start parse node relation, 
relation:NodeRelation(inputs=[test_kafka_stream], outputs=[test_kafka_sink]) 
   [ ] 2025-08-15 10:03:18.619 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :292 - start parse node, node 
id:test_kafka_stream 
   [ ] 2025-08-15 10:03:18.654 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :294 - node id:test_kafka_stream, create table 
sql:
   CREATE TABLE `table_test_kafka_stream`(
       `id` INT,
       `name` STRING)
       WITH (
       'metrics.audit.key' = '9',
       'metrics.audit.proxy.hosts' = 'audit:10081',
       'inlong.metric.labels' = 
'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream',
       'topic' = 'test_kafka_group.test_kafka_stream',
       'properties.bootstrap.servers' = 'kafka:9092',
       'connector' = 'kafka-inlong',
       'inlong-msg.debezium-json.schema-include' = 'false',
       'inlong-msg.debezium-json.encode.decimal-as-plain-number' = 'true',
       'inlong-msg.inner.format' = 'debezium-json',
       'format' = 'inlong-msg',
       'inlong-msg.debezium-json.timestamp-format.standard' = 'SQL',
       'inlong-msg.debezium-json.ignore-parse-errors' = 'true',
       'inlong-msg.debezium-json.map-null-key.literal' = 'null',
       'inlong-msg.debezium-json.map-null-key.mode' = 'DROP',
       'inlong-msg.ignore-parse-errors' = 'false',
       'scan.startup.mode' = 'earliest-offset',
       'properties.group.id' = 
'default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   ) 
   [ ] 2025-08-15 10:03:18.660 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :318 - parse node success, node 
id:test_kafka_stream 
   [ ] 2025-08-15 10:03:18.665 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :301 - node id:test_kafka_sink, create table sql:
   CREATE TABLE `test_ck_db.test_ck_table`(
       PRIMARY KEY (`id`) NOT ENFORCED,
       `id` BIGINT,
       `name` STRING)
       WITH (
       'metrics.audit.key' = '10',
       'metrics.audit.proxy.hosts' = 'audit:10081',
       'inlong.metric.labels' = 
'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_sink',
       'connector' = 'jdbc-inlong',
       'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect',
       'url' = 'jdbc:clickhouse://clickhouse:8123/test_ck_db',
       'table-name' = 'test_ck_db.test_ck_table',
       'username' = 'admin',
       'password' = '******'
   ) 
   [ ] 2025-08-15 10:03:18.667 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :318 - parse node success, node 
id:test_kafka_sink 
   [ ] 2025-08-15 10:03:18.668 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :246 - parse node relation success, 
relation:NodeRelation(inputs=[test_kafka_stream], outputs=[test_kafka_sink]) 
   [ ] 2025-08-15 10:03:18.669 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :183 - parse stream success, 
streamId:test_kafka_stream 
   [ ] 2025-08-15 10:03:18.669 - INFO [inlong-plugin-0] 
o.a.i.s.p.i.FlinkSqlParser    :147 - parse group success, 
groupId:test_kafka_group 
   [ ] 2025-08-15 10:03:18.680 - INFO [inlong-plugin-0] 
.a.i.s.p.r.FlinkSqlParseResult:89 - execute createSql:
   CREATE TABLE `table_test_kafka_stream`(
       `id` INT,
       `name` STRING)
       WITH (
       'metrics.audit.key' = '9',
       'metrics.audit.proxy.hosts' = 'audit:10081',
       'inlong.metric.labels' = 
'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream',
       'topic' = 'test_kafka_group.test_kafka_stream',
       'properties.bootstrap.servers' = 'kafka:9092',
       'connector' = 'kafka-inlong',
       'inlong-msg.debezium-json.schema-include' = 'false',
       'inlong-msg.debezium-json.encode.decimal-as-plain-number' = 'true',
       'inlong-msg.inner.format' = 'debezium-json',
       'format' = 'inlong-msg',
       'inlong-msg.debezium-json.timestamp-format.standard' = 'SQL',
       'inlong-msg.debezium-json.ignore-parse-errors' = 'true',
       'inlong-msg.debezium-json.map-null-key.literal' = 'null',
       'inlong-msg.debezium-json.map-null-key.mode' = 'DROP',
       'inlong-msg.ignore-parse-errors' = 'false',
       'scan.startup.mode' = 'earliest-offset',
       'properties.group.id' = 
'default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   ) 
   [ ] 2025-08-15 10:03:19.035 - INFO [inlong-plugin-0] 
.a.i.s.p.r.FlinkSqlParseResult:89 - execute createSql:
   CREATE TABLE `test_ck_db.test_ck_table`(
       PRIMARY KEY (`id`) NOT ENFORCED,
       `id` BIGINT,
       `name` STRING)
       WITH (
       'metrics.audit.key' = '10',
       'metrics.audit.proxy.hosts' = 'audit:10081',
       'inlong.metric.labels' = 
'groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_sink',
       'connector' = 'jdbc-inlong',
       'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect',
       'url' = 'jdbc:clickhouse://clickhouse:8123/test_ck_db',
       'table-name' = 'test_ck_db.test_ck_table',
       'username' = 'admin',
       'password' = '******'
   ) 
   [ ] 2025-08-15 10:03:19.053 - INFO [inlong-plugin-0] 
.a.i.s.p.r.FlinkSqlParseResult:81 - execute loadSql:
   INSERT INTO `test_ck_db.test_ck_table`
       SELECT 
       CAST(`id` as BIGINT) AS `id`,
       `name` AS `name`
       FROM `table_test_kafka_stream`  
   [ ] 2025-08-15 10:03:19.424 - INFO [inlong-plugin-0] 
s.k.t.KafkaDynamicTableFactory:170 - valueFormatPrefix is inlong-msg 
   [ ] 2025-08-15 10:03:19.470 -ERROR [inlong-plugin-0] 
o.a.i.m.p.f.FlinkService      :181 - submit job from info 
FlinkInfo(endpoint=null, 
jobName=InLong-Sort-test_kafka_group-test_kafka_stream, 
inlongStreamInfoList=[InlongStreamInfo(id=5, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, name=null, description=, 
mqResource=test_kafka_stream, dataType=CSV, dataEncoding=UTF-8, 
dataSeparator=124, dataEscapeChar=null, syncSend=0, dailyRecords=10, 
dailyStorage=10, peakRecords=1000, maxLength=10240, storagePeriod=1, 
extParams={"ignoreParseError":true,"kvSeparator":null,"lineSeparator":null,"useExtendedFields":false,"predefinedFields":"","sinkMultipleEnable":false,"extendedFieldSize":0},
 status=110, previousStatus=100, creator=dev, modifier=admin, createTime=Fri 
Aug 15 01:55:56 UTC 2025, modifyTime=Fri Aug 15 02:03:12 UTC 2025, 
fieldList=[StreamField(id=9, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, fieldName=id, fieldType=int, fieldComment=id, 
isPrede
 finedField=0, fieldValue=null, preExpression=null, isMetaField=0, 
metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, extParams=null), StreamField(id=10, 
inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, 
fieldName=name, fieldType=string, fieldComment=name, isPredefinedField=0, 
fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, 
fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null)], 
extList=[InlongStreamExtInfo(id=null, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, keyName=dataflow, 
keyValue={"groupId":"test_kafka_group","streams":[{"streamId":"test_kafka_stream","nodes":[{"type":"kafkaExtract","id":"test_kafka_stream","name":"test_kafka_stream","fields":[{"type":"field","name":"id","nodeId":"test_kafka_stream","formatInfo":{"type":"int"}},{"type":"field","name":"name","nodeId":"test_kafka_stream","formatInfo":{"type":"string"}}],"properties":{"metrics.audit.key":"9","metrics.
 
audit.proxy.hosts":"audit:10081"},"topic":"test_kafka_group.test_kafka_stream","bootstrapServers":"kafka:9092","format":{"type":"inLongMsgFormat","innerFormat":{"type":"debeziumJsonFormat","schemaInclude":false,"ignoreParseErrors":true,"timestampFormatStandard":"SQL","mapNullKeyMode":"DROP","mapNullKeyLiteral":"null","encodeDecimalAsPlainNumber":true},"ignoreParseErrors":false},"scanStartupMode":"EARLIEST_OFFSET","groupId":"default_cluster_test_kafka_group.test_kafka_stream_consumer_group"},{"type":"clickHouseLoad","id":"test_kafka_sink","name":"test_kafka_sink","fields":[{"type":"field","name":"id","nodeId":"test_kafka_sink","formatInfo":{"type":"long"}},{"type":"field","name":"name","nodeId":"test_kafka_sink","formatInfo":{"type":"string"}}],"fieldRelations":[{"type":"fieldRelation","inputField":{"type":"field","name":"id","formatInfo":{"type":"int"}},"outputField":{"type":"field","name":"id","formatInfo":{"type":"long"}}},{"type":"fieldRelation","inputField":{"type":"field","name
 
":"name","formatInfo":{"type":"string"}},"outputField":{"type":"field","name":"name","formatInfo":{"type":"string"}}}],"properties":{"metrics.audit.key":"10","metrics.audit.proxy.hosts":"audit:10081"},"tableName":"test_ck_db.test_ck_table","url":"jdbc:clickhouse://clickhouse:8123/test_ck_db","userName":"admin","passWord":"******","primaryKey":"id"}],"relations":[{"type":"baseRelation","inputs":["test_kafka_stream"],"outputs":["test_kafka_sink"]}]}],"properties":{}})],
 sourceList=[MySQLBinlogSource(super=StreamSource(id=5, 
inlongGroupId=test_kafka_group, inlongStreamId=test_kafka_stream, 
sourceType=MYSQL_BINLOG, sourceName=test_kafka_source, agentIp=null, uuid=null, 
inlongClusterName=null, inlongClusterNodeTag=null, dataNodeName=null, 
serializationType=debezium_json, snapshot=null, dataTimeZone=null, version=1, 
status=110, previousStatus=null, creator=dev, modifier=dev, createTime=Fri Aug 
15 01:59:11 UTC 2025, modifyTime=Fri Aug 15 01:59:11 UTC 2025, properties={}, 
taskMapId=null, au
 ditVersion=null, dataAddTaskList=null, ignoreParseError=null), 
user=test_mysql_db, password=****** hostname=mysql, port=3306, serverId=0, 
includeSchema=null, databaseWhiteList=test_mysql_db, 
tableWhiteList=test_mysql_db.test_mysql_table, serverTimezone=UTC, 
intervalMs=1000, snapshotMode=initial, offsetFilename=null, 
historyFilename=/data/inlong-agent/.history, monitoredDdl=null, 
timestampFormatStandard=SQL, allMigration=false, onlyIncremental=true, 
primaryKey=null, specificOffsetFile=null, specificOffsetPos=null)], 
sinkList=[ClickHouseSink(super=StreamSink(super=StreamNode(preNodes=null, 
postNodes=null, fieldList=[StreamField(id=9, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, fieldName=id, fieldType=int, fieldComment=id, 
isPredefinedField=0, fieldValue=null, preExpression=null, isMetaField=0, 
metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, extParams=null), StreamField(id=10, 
inlongGroupId=test_kafka_group, inlongStreamId=test_kaf
 ka_stream, fieldName=name, fieldType=string, fieldComment=name, 
isPredefinedField=0, fieldValue=null, preExpression=null, isMetaField=0, 
metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, extParams=null)]), id=5, inlongGroupId=test_kafka_group, 
inlongStreamId=test_kafka_stream, sinkType=CLICKHOUSE, 
sinkName=test_kafka_sink, transformSql=null, description=, 
inlongClusterName=null, dataNodeName=5924d85a-57bc-4d0c-a86a-dbe6350cf313, 
sortTaskName=null, sortConsumerGroup=null, enableCreateResource=1, 
enableDataArchiving=null, operateLog=success to create clickhouse resource, 
status=110, previousStatus=100, creator=dev, modifier=admin, createTime=Fri Aug 
15 01:55:56 UTC 2025, modifyTime=Fri Aug 15 02:03:12 UTC 2025, 
sinkFieldList=[SinkField(id=9, sinkType=null, inlongGroupId=null, 
inlongStreamId=null, fieldName=id, fieldType=Int64, fieldComment=id, 
isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, sourceFieldName=
 id, sourceFieldType=int, 
extParams={"sinkType":"CLICKHOUSE","id":null,"sinkType":"CLICKHOUSE","inlongGroupId":null,"inlongStreamId":null,"fieldName":"id","fieldType":"Int64","fieldComment":null,"isMetaField":0,"metaFieldName":null,"fieldFormat":null,"originNodeName":null,"originFieldName":null,"sourceFieldName":"id","sourceFieldType":"int","extParams":null,"defaultType":null,"defaultExpr":null,"compressionCode":null,"ttlExpr":null}),
 SinkField(id=10, sinkType=null, inlongGroupId=null, inlongStreamId=null, 
fieldName=name, fieldType=String, fieldComment=name, isMetaField=0, 
metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, sourceFieldName=name, sourceFieldType=string, 
extParams={"sinkType":"CLICKHOUSE","id":null,"sinkType":"CLICKHOUSE","inlongGroupId":null,"inlongStreamId":null,"fieldName":"name","fieldType":"String","fieldComment":null,"isMetaField":0,"metaFieldName":null,"fieldFormat":null,"originNodeName":null,"originFieldName":null,"sourceFieldName":
 
"name","sourceFieldType":"string","extParams":null,"defaultType":null,"defaultExpr":null,"compressionCode":null,"ttlExpr":null})],
 properties={metrics.audit.key=10, metrics.audit.proxy.hosts=audit:10081}, 
dataEncoding=UTF-8, dataFormat=NONE, authentication=null, version=1), 
jdbcUrl=jdbc:clickhouse://clickhouse:8123, username=admin, password=****** 
dbName=test_ck_db, tableName=test_ck_table, flushInterval=1, flushRecord=1000, 
retryTimes=3, isDistributed=0, partitionStrategy=null, partitionFields=null, 
keyFieldNames=null, engine=MergeTree, partitionBy=null, orderBy=null, ttl=null, 
ttlUnit=null, cluster=null, primaryKey=id)], version=1, wrapType=INLONG_MSG_V0, 
useExtendedFields=false, extendedFieldSize=0, sinkMultipleEnable=false, 
syncField=false, ignoreParseError=true)], 
localJarPath=/opt/inlong-sort/sort-dist-2.2.0.jar, 
connectorJarPaths=[/opt/inlong-sort/connectors/sort-connector-jdbc-v1.15-2.2.0.jar,
 /opt/inlong-sort/connectors/sort-connector-kafka-v1.15-2.2.0.jar], 
localConfPath=/
 opt/inlong-manager/lib/InLong-Sort-test_kafka_group-test_kafka_stream, 
sourceType=null, sinkType=null, jobId=null, savepointPath=null, 
isException=false, exceptionMsg=null, runtimeExecutionMode=stream, 
boundaryType=null, lowerBoundary=null, upperBoundary=null) failed:  
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Unable to create a source for reading table 
'default_catalog.default_database.table_test_kafka_stream'.
   
   Table options are:
   
   'connector'='kafka-inlong'
   'format'='inlong-msg'
   'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true'
   'inlong-msg.debezium-json.ignore-parse-errors'='true'
   'inlong-msg.debezium-json.map-null-key.literal'='null'
   'inlong-msg.debezium-json.map-null-key.mode'='DROP'
   'inlong-msg.debezium-json.schema-include'='false'
   'inlong-msg.debezium-json.timestamp-format.standard'='SQL'
   'inlong-msg.ignore-parse-errors'='false'
   'inlong-msg.inner.format'='debezium-json'
   
'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream'
   'metrics.audit.key'='9'
   'metrics.audit.proxy.hosts'='audit:10081'
   'properties.bootstrap.servers'='kafka:9092'
   
'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   'scan.startup.mode'='earliest-offset'
   'topic'='test_kafka_group.test_kafka_stream'
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-clients-1.15.4.jar:1.15.4]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-clients-1.15.4.jar:1.15.4]
        at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
 ~[flink-clients-1.15.4.jar:1.15.4]
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
 ~[flink-clients-1.15.4.jar:1.15.4]
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
 ~[flink-clients-1.15.4.jar:1.15.4]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:244)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:179)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
a source for reading table 
'default_catalog.default_database.table_test_kafka_stream'.
   
   Table options are:
   
   'connector'='kafka-inlong'
   'format'='inlong-msg'
   'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true'
   'inlong-msg.debezium-json.ignore-parse-errors'='true'
   'inlong-msg.debezium-json.map-null-key.literal'='null'
   'inlong-msg.debezium-json.map-null-key.mode'='DROP'
   'inlong-msg.debezium-json.schema-include'='false'
   'inlong-msg.debezium-json.timestamp-format.standard'='SQL'
   'inlong-msg.ignore-parse-errors'='false'
   'inlong-msg.inner.format'='debezium-json'
   
'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream'
   'metrics.audit.key'='9'
   'metrics.audit.proxy.hosts'='audit:10081'
   'properties.bootstrap.servers'='kafka:9092'
   
'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   'scan.startup.mode'='earliest-offset'
   'topic'='test_kafka_group.test_kafka_stream'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159)
 ~[flink-table-common-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
 ~[flink-table-common-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) 
~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62)
 ~[flink-table-api-java-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:45)
 ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:30)
 ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:82)
 ~[?:?]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
 ~[?:?]
        at org.apache.inlong.sort.Entrance.main(Entrance.java:101) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-clients-1.15.4.jar:1.15.4]
        ... 12 more
   Caused by: java.lang.NullPointerException
        at 
org.apache.inlong.sort.kafka.shaded.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
 ~[?:?]
        at 
org.apache.inlong.sort.kafka.shaded.com.google.common.base.Splitter.split(Splitter.java:387)
 ~[?:?]
        at 
org.apache.inlong.sort.kafka.shaded.com.google.common.base.Splitter$MapSplitter.split(Splitter.java:518)
 ~[?:?]
        at 
org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.util.AuditUtils.extractChangelogAuditKeyMap(AuditUtils.java:59)
 ~[?:?]
        at 
org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base.metric.MetricOption$Builder.build(MetricOption.java:262)
 ~[?:?]
        at 
org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:209)
 ~[?:?]
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156)
 ~[flink-table-common-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
 ~[flink-table-common-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
 ~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) 
~[flink-table-planner_2.12-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:62)
 ~[flink-table-api-java-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:45)
 ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamStatementSetImpl.addInsertSql(StreamStatementSetImpl.java:30)
 ~[flink-table-api-java-bridge-1.15.4.jar:1.15.4]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:82)
 ~[?:?]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
 ~[?:?]
        at org.apache.inlong.sort.Entrance.main(Entrance.java:101) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-clients-1.15.4.jar:1.15.4]
        ... 12 more
   [ ] 2025-08-15 10:03:20.198 - WARN [inlong-plugin-0] 
.i.m.p.f.IntegrationTaskRunner:64 - Start job null failed in backend 
exception[java.lang.Exception: submit job failed: The main method caused an 
error: Unable to create a source for reading table 
'default_catalog.default_database.table_test_kafka_stream'.
   
   Table options are:
   
   'connector'='kafka-inlong'
   'format'='inlong-msg'
   'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true'
   'inlong-msg.debezium-json.ignore-parse-errors'='true'
   'inlong-msg.debezium-json.map-null-key.literal'='null'
   'inlong-msg.debezium-json.map-null-key.mode'='DROP'
   'inlong-msg.debezium-json.schema-include'='false'
   'inlong-msg.debezium-json.timestamp-format.standard'='SQL'
   'inlong-msg.ignore-parse-errors'='false'
   'inlong-msg.inner.format'='debezium-json'
   
'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream'
   'metrics.audit.key'='9'
   'metrics.audit.proxy.hosts'='audit:10081'
   'properties.bootstrap.servers'='kafka:9092'
   
'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   'scan.startup.mode'='earliest-offset'
   'topic'='test_kafka_group.test_kafka_stream'
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182)
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ] 
   [ ] 2025-08-15 10:03:20.216 - INFO [inlong-mq-process-0] 
o.a.i.m.p.u.FlinkUtils        :316 - job submit success for groupId = 
test_kafka_group, streamId = test_kafka_stream, jobId = null 
   [ ] 2025-08-15 10:03:20.217 -ERROR [inlong-mq-process-0] 
a.i.m.w.e.LogableEventListener:88 - execute listener 
WorkflowEventLogEntity(id=null, processId=19, 
processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream, 
inlongGroupId=test_kafka_group, taskId=37, elementName=InitSort, 
elementDisplayName=Stream-InitSort, eventType=TaskEvent, event=COMPLETE, 
listener=StartupStreamListener, startTime=Fri Aug 15 10:03:14 UTC 2025, 
endTime=null, status=-1, async=0, ip=172.21.0.8, remark=null, exception=startup 
failed: Start job null failed in backend exception[java.lang.Exception: submit 
job failed: The main method caused an error: Unable to create a source for 
reading table 'default_catalog.default_database.table_test_kafka_stream'.
   
   Table options are:
   
   'connector'='kafka-inlong'
   'format'='inlong-msg'
   'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true'
   'inlong-msg.debezium-json.ignore-parse-errors'='true'
   'inlong-msg.debezium-json.map-null-key.literal'='null'
   'inlong-msg.debezium-json.map-null-key.mode'='DROP'
   'inlong-msg.debezium-json.schema-include'='false'
   'inlong-msg.debezium-json.timestamp-format.standard'='SQL'
   'inlong-msg.ignore-parse-errors'='false'
   'inlong-msg.inner.format'='debezium-json'
   
'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream'
   'metrics.audit.key'='9'
   'metrics.audit.proxy.hosts'='audit:10081'
   'properties.bootstrap.servers'='kafka:9092'
   
'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   'scan.startup.mode'='earliest-offset'
   'topic'='test_kafka_group.test_kafka_stream'
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182)
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ]) error:  
   org.apache.inlong.manager.common.exceptions.BusinessException: startup 
failed: Start job null failed in backend exception[java.lang.Exception: submit 
job failed: The main method caused an error: Unable to create a source for 
reading table 'default_catalog.default_database.table_test_kafka_stream'.
   
   Table options are:
   
   'connector'='kafka-inlong'
   'format'='inlong-msg'
   'inlong-msg.debezium-json.encode.decimal-as-plain-number'='true'
   'inlong-msg.debezium-json.ignore-parse-errors'='true'
   'inlong-msg.debezium-json.map-null-key.literal'='null'
   'inlong-msg.debezium-json.map-null-key.mode'='DROP'
   'inlong-msg.debezium-json.schema-include'='false'
   'inlong-msg.debezium-json.timestamp-format.standard'='SQL'
   'inlong-msg.ignore-parse-errors'='false'
   'inlong-msg.inner.format'='debezium-json'
   
'inlong.metric.labels'='groupId=test_kafka_group&streamId=test_kafka_stream&nodeId=test_kafka_stream'
   'metrics.audit.key'='9'
   'metrics.audit.proxy.hosts'='audit:10081'
   'properties.bootstrap.servers'='kafka:9092'
   
'properties.group.id'='default_cluster_test_kafka_group.test_kafka_stream_consumer_group'
   'scan.startup.mode'='earliest-offset'
   'topic'='test_kafka_group.test_kafka_stream'
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:182)
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ]
        at 
org.apache.inlong.manager.plugin.flink.FlinkOperation.pollJobStatus(FlinkOperation.java:308)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJob(FlinkUtils.java:330)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJob(FlinkUtils.java:262)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.plugin.listener.StartupStreamListener.listen(StartupStreamListener.java:77)
 ~[manager-plugins-base-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:165)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   [ ] 2025-08-15 10:03:20.264 - INFO [inlong-mq-process-0] 
.m.s.s.InlongStreamServiceImpl:669 - success to update stream after approve for 
groupId=test_kafka_group, streamId=test_kafka_stream 
   [ ] 2025-08-15 10:03:20.270 -ERROR [inlong-mq-process-0] 
.m.s.l.q.QueueResourceListener:174 - failed to start stream process for 
groupId=test_kafka_group streamId=test_kafka_stream 
   [ ] 2025-08-15 10:03:20.272 -ERROR [inlong-workflow-0] 
.m.s.l.q.QueueResourceListener:185 - failed to execute stream process in 
asynchronously  
   java.util.concurrent.ExecutionException: 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed 
to start stream process for groupId=test_kafka_group streamId=test_kafka_stream
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_342]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 
~[?:1.8.0_342]
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:182)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:140)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:81)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   Caused by: 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed 
to start stream process for groupId=test_kafka_group streamId=test_kafka_stream
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:175)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_342]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_342]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_342]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
 ~[?:1.8.0_342]
        ... 3 more
   [ ] 2025-08-15 10:03:20.274 -ERROR [inlong-workflow-0] 
a.i.m.w.e.LogableEventListener:88 - execute listener 
WorkflowEventLogEntity(id=null, processId=18, 
processName=CREATE_GROUP_RESOURCE, processDisplayName=Create Group, 
inlongGroupId=test_kafka_group, taskId=34, elementName=InitMQ, 
elementDisplayName=Group-InitMQ, eventType=TaskEvent, event=COMPLETE, 
listener=QueueResourceListener, startTime=Fri Aug 15 10:03:12 UTC 2025, 
endTime=null, status=-1, async=0, ip=172.21.0.8, remark=null, exception=failed 
to execute stream process in asynchronously : 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed 
to start stream process for groupId=test_kafka_group 
streamId=test_kafka_stream) error:  
   org.apache.inlong.manager.common.exceptions.WorkflowListenerException: 
failed to execute stream process in asynchronously : 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed 
to start stream process for groupId=test_kafka_group streamId=test_kafka_stream
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:186)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:140)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
 ~[manager-workflow-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:81)
 ~[manager-service-2.2.0.jar:2.2.0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   ```
   
   
   ### What you expected to happen
   
   正常配置
   
   ### How to reproduce
   
   使用 docker compose 安装并按照Kafka 示例操作
   
   ### Environment
   
   docker 环境
   
   ### InLong version
   
   master
   
   ### InLong Component
   
   InLong Manager
   
   ### Are you willing to submit PR?
   
   - [ ] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to