haifxu opened a new issue, #9205:
URL: https://github.com/apache/inlong/issues/9205

   ### What happened
   
   ```
   [ ] 2023-11-02 08:35:50.457 -ERROR [inlong-plugin-0] 
o.a.i.m.p.f.FlinkService      :146 - submit job from info 
FlinkInfo(endpoint=null, jobName=InLong-Sort-test_group_10, 
inlongStreamInfoList=[InlongStreamInfo(id=13, inlongGroupId=test_group_10, 
inlongStreamId=test_stream_10, name=null, description=, 
mqResource=test_stream_10, dataType=null, dataEncoding=UTF-8, 
dataSeparator=124, dataEscapeChar=null, syncSend=0, dailyRecords=10, 
dailyStorage=10, peakRecords=1000, maxLength=10240, storagePeriod=1, 
extParams={"ignoreParseError":true,"useExtendedFields":false}, status=130, 
previousStatus=100, creator=admin, modifier=admin, createTime=Thu Nov 02 
00:28:51 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, 
fieldList=[StreamField(id=58, inlongGroupId=test_group_10, 
inlongStreamId=test_stream_10, fieldName=id, fieldType=int, fieldComment=null, 
isPredefinedField=null, fieldValue=null, preExpression=null, isMetaField=0, 
metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldN
 ame=null, extParams=null), StreamField(id=59, inlongGroupId=test_group_10, 
inlongStreamId=test_stream_10, fieldName=name, fieldType=string, 
fieldComment=null, isPredefinedField=null, fieldValue=null, preExpression=null, 
isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, 
originFieldName=null, extParams=null)], extList=null, 
sourceList=[MySQLBinlogSource(super=StreamSource(id=12, 
inlongGroupId=test_group_10, inlongStreamId=test_stream_10, 
sourceType=MYSQL_BINLOG, sourceName=test_source_10, agentIp=null, uuid=null, 
inlongClusterName=null, inlongClusterNodeTag=null, dataNodeName=null, 
serializationType=debezium_json, snapshot=null, version=1, status=101, 
previousStatus=110, creator=admin, modifier=admin, createTime=Thu Nov 02 
00:29:16 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, properties={}, 
templateId=null, subSourceList=null, ignoreParseError=false), user=root, 
password=****** hostname=*, port=3306, serverId=0, includeSchema=null, 
databaseWhiteList=test,
  tableWhiteList=test.source_table, serverTimezone=null, intervalMs=500, 
snapshotMode=initial, offsetFilename=null, historyFilename=null, 
monitoredDdl=null, timestampFormatStandard=SQL, allMigration=false, 
primaryKey=null, specificOffsetFile=null, specificOffsetPos=null)], 
sinkList=[HudiSink(super=StreamSink(super=StreamNode(preNodes=null, 
postNodes=null, fieldList=null), id=12, inlongGroupId=test_group_10, 
inlongStreamId=test_stream_10, sinkType=HUDI, sinkName=test_sink_10, 
description=null, inlongClusterName=null, 
dataNodeName=65be8d12-4815-40ed-b52e-57d5a8ecdc5c, sortTaskName=null, 
sortConsumerGroup=null, enableCreateResource=1, operateLog=success to create 
Hudi resource, status=130, previousStatus=130, creator=admin, modifier=admin, 
createTime=Thu Nov 02 00:35:06 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 
2023, sinkFieldList=[SinkField(id=37, sinkType=null, inlongGroupId=null, 
inlongStreamId=null, fieldName=id, fieldType=int, fieldComment=id, 
isMetaField=0, metaFieldName=null, 
 fieldFormat=null, originNodeName=null, originFieldName=null, 
sourceFieldName=id, sourceFieldType=int, extParams=null), SinkField(id=38, 
sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=name, 
fieldType=string, fieldComment=name, isMetaField=0, metaFieldName=null, 
fieldFormat=null, originNodeName=null, originFieldName=null, 
sourceFieldName=name, sourceFieldType=string, extParams=null)], properties={}, 
dataEncoding=UTF-8, dataFormat=NONE, authentication=null, version=1), 
catalogType=HIVE, catalogUri=thrift://*:9083, warehouse=hdfs://*/warehouse, 
dbName=test_db, tableName=sink_table, dataPath=null, fileFormat=Parquet, 
partitionType=null, primaryKey=, extList=[], partitionKey=null)], version=3, 
wrapType=INLONG_MSG_V0, useExtendedFields=false, ignoreParseError=true)], 
localJarPath=/opt/inlong-sort/sort-dist-1.10.0-SNAPSHOT.jar, 
connectorJarPaths=[/opt/inlong-sort/connectors/sort-connector-mysql-cdc-1.10.0-SNAPSHOT.jar,
 /opt/inlong-sort/connectors/sort-connector-hudi-1.10.
 0-SNAPSHOT.jar], 
localConfPath=/opt/inlong-manager/lib/InLong-Sort-test_group_10, 
sourceType=null, sinkType=null, jobId=null, savepointPath=null, 
isException=false, exceptionMsg=null) failed:  
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Unable to create a sink for writing table 
'default_catalog.default_database.sink_table'.
   
   Table options are:
   
   'connector'='hudi-inlong'
   'hive_sync.db'='test_db'
   'hive_sync.enabled'='true'
   'hive_sync.metastore.uris'='thrift://*:9083'
   'hive_sync.mode'='hms'
   'hive_sync.table'='sink_table'
   'hoodie.database.name'='test_db'
   'hoodie.datasource.write.recordkey.field'=''
   'hoodie.table.name'='sink_table'
   
'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10'
   'metrics.audit.key'='16'
   'metrics.audit.proxy.hosts'='audit:10081'
   'path'='hdfs://*/warehouse/test_db.db/sink_table'
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:192)
 ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
        at 
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:144)
 ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
        at 
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
 ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
a sink for writing table 'default_catalog.default_database.sink_table'.
   
   Table options are:
   
   'connector'='hudi-inlong'
   'hive_sync.db'='test_db'
   'hive_sync.enabled'='true'
   'hive_sync.metastore.uris'='thrift://*:9083'
   'hive_sync.mode'='hms'
   'hive_sync.table'='sink_table'
   'hoodie.database.name'='test_db'
   'hoodie.datasource.write.recordkey.field'=''
   'hoodie.table.name'='sink_table'
   
'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10'
   'metrics.audit.key'='16'
   'metrics.audit.proxy.hosts'='audit:10081'
   'path'='hdfs://*/warehouse/test_db.db/sink_table'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
 ~[flink-table-common-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[scala-library-2.11.12.jar:?]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84)
 ~[?:?]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
 ~[?:?]
        at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        ... 12 more
   Caused by: org.apache.hudi.exception.HoodieValidationException: Field '' 
specified in option 'hoodie.datasource.write.recordkey.field' does not exist in 
the table schema.
        at 
org.apache.hudi.table.HoodieTableFactory.lambda$sanityCheck$2(HoodieTableFactory.java:139)
 ~[?:?]
        at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_342]
        at 
org.apache.hudi.table.HoodieTableFactory.sanityCheck(HoodieTableFactory.java:137)
 ~[?:?]
        at 
org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:91)
 ~[?:?]
        at 
org.apache.inlong.sort.hudi.table.HudiTableInlongFactory.createDynamicTableSink(HudiTableInlongFactory.java:51)
 ~[?:?]
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
 ~[flink-table-common-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[scala-library-2.11.12.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[scala-library-2.11.12.jar:?]
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[scala-library-2.11.12.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[scala-library-2.11.12.jar:?]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
 ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
 ~[flink-table-api-java-1.13.5.jar:1.13.5]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84)
 ~[?:?]
        at 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
 ~[?:?]
        at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_342]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_342]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_342]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-clients_2.11-1.13.5.jar:1.13.5]
        ... 12 more
   ```
   
   ### What you expected to happen
   
   Use Hudi sink
   
   ### How to reproduce
   
   Use Hudi sink
   
   ### Environment
   
   _No response_
   
   ### InLong version
   
   master
   
   ### InLong Component
   
   InLong Manager
   
   ### Are you willing to submit PR?
   
   - [ ] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to