li-ang-666 opened a new issue, #9832:
URL: https://github.com/apache/hudi/issues/9832

   the bulk_insert is:
   CREATE TABLE source_table(
     id                DECIMAL(20,0),
     graph_id          BIGINT,
     base              STRING,
     name              STRING,
     legal_person_id   BIGINT,
     legal_person_name STRING,
     legal_person_type INT,
     code              STRING,
     reg_number        STRING,
     company_org_type  STRING,
     reg_location      STRING,
     establish_date    DATE,
     from_date         DATE,
     to_date           DATE,
     business_scope    STRING,
     reg_institute     STRING,
     approved_date     DATE,
     reg_status        STRING,
     reg_capital       STRING,
     org_number        STRING,
     source_flag       STRING,
     crawled_time      TIMESTAMP(3),
     deleted           TINYINT,
     create_time       TIMESTAMP(3),
     update_time       TIMESTAMP(3),
     op_ts as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'jdbc',
     'url' = 
'jdbc:mysql://xxxxx:3306/prism?useSSL=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false',
     'table-name' = 'enterprise',
     'username' = 'xxxxx',
     'password' = '2s0^xxxxx',
     'scan.partition.column' = 'id',
     'scan.partition.num' = '300000',
     'scan.partition.lower-bound' = '1',
     'scan.partition.upper-bound' = '3000000000',
     'scan.fetch-size' = '1024'
   );
   create table enterprise(
     id                DECIMAL(20,0),
     graph_id          BIGINT,
     base              STRING,
     name              STRING,
     legal_person_id   BIGINT,
     legal_person_name STRING,
     legal_person_type INT,
     code              STRING,
     reg_number        STRING,
     company_org_type  STRING,
     reg_location      STRING,
     establish_date    DATE,
     from_date         DATE,
     to_date           DATE,
     business_scope    STRING,
     reg_institute     STRING,
     approved_date     DATE,
     reg_status        STRING,
     reg_capital       STRING,
     org_number        STRING,
     source_flag       STRING,
     crawled_time      TIMESTAMP(3),
     deleted           TINYINT,
     create_time       TIMESTAMP(3),
     update_time       TIMESTAMP(3),
     op_ts             TIMESTAMP(3),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'hudi',
     'path' = 'obs://hadoop-obs/hudi_ods/enterprise011',
     'table.type' = 'MERGE_ON_READ',
     -- cdc
     'changelog.enabled' = 'true',
     -- index
     'index.type' = 'BUCKET',
     'hoodie.bucket.index.num.buckets' = '128',
     -- write
     'write.operation' = 'bulk_insert',
     'write.bulk_insert.shuffle_input' = 'false',
     'write.bulk_insert.sort_input' = 'false',
     'write.tasks' = '64',
     'write.precombine' = 'true',
     'write.precombine.field' = 'op_ts',
     -- compaction
     'compaction.schedule.enabled' = 'false',
     'compaction.async.enabled' = 'false',
     -- clean
     'clean.async.enabled' = 'false'
   );
   insert into enterprise select * from source_table;
   -----------------------------------------------------
   the stream insert is:
   CREATE TABLE source_table (
     id                DECIMAL(20,0),
     graph_id          BIGINT,
     base              STRING,
     name              STRING,
     legal_person_id   BIGINT,
     legal_person_name STRING,
     legal_person_type INT,
     code              STRING,
     reg_number        STRING,
     company_org_type  STRING,
     reg_location      STRING,
     establish_date    DATE,
     from_date         DATE,
     to_date           DATE,
     business_scope    STRING,
     reg_institute     STRING,
     approved_date     DATE,
     reg_status        STRING,
     reg_capital       STRING,
     org_number        STRING,
     source_flag       STRING,
     crawled_time      TIMESTAMP(3),
     deleted           TINYINT,
     create_time       TIMESTAMP(3),
     update_time       TIMESTAMP(3),
     op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'kafka',
     'topic' = '56736.json.prism.enterprise',
     'properties.bootstrap.servers' = 
'10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
     'properties.group.id' = 'hudi-demo-job',
     'scan.startup.mode' = 'earliest-offset',
     -- canal
     'format' = 'canal-json',
     'canal-json.ignore-parse-errors' = 'true',
     'canal-json.encode.decimal-as-plain-number' = 'true'
   );
   create table enterprise(
     id                DECIMAL(20,0),
     graph_id          BIGINT,
     base              STRING,
     name              STRING,
     legal_person_id   BIGINT,
     legal_person_name STRING,
     legal_person_type INT,
     code              STRING,
     reg_number        STRING,
     company_org_type  STRING,
     reg_location      STRING,
     establish_date    DATE,
     from_date         DATE,
     to_date           DATE,
     business_scope    STRING,
     reg_institute     STRING,
     approved_date     DATE,
     reg_status        STRING,
     reg_capital       STRING,
     org_number        STRING,
     source_flag       STRING,
     crawled_time      TIMESTAMP(3),
     deleted           TINYINT,
     create_time       TIMESTAMP(3),
     update_time       TIMESTAMP(3),
     op_ts             TIMESTAMP(3),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'hudi',
     'path' = 'obs://hadoop-obs/hudi_ods/enterprise011',
     'table.type' = 'MERGE_ON_READ',
     -- cdc
     'changelog.enabled' = 'true',
     -- index
     'index.type' = 'BUCKET', 
     'hoodie.bucket.index.num.buckets' = '128',
     -- write
     'write.tasks' = '2',
     'write.task.max.size' = '512',
     'write.batch.size' = '4',
     'write.log_block.size' = '8',
     'write.precombine' = 'true',
     'write.precombine.field' = 'op_ts',
     -- compaction
     'compaction.schedule.enabled' = 'true',
     'compaction.async.enabled' = 'false',
     'compaction.delta_commits' = '3',
     -- clean
     'clean.async.enabled' = 'true',
     'clean.retain_commits' = '2880'
   );
   insert into enterprise select * from source_table;
   --------------------------------------------------------------
   got exception when compaction:
   2023-10-08 21:45:03.232 [ERROR] [compact_task (31/128)#0] 
(HoodieMergeHandle.java:325) - Error writing record  HoodieRecord{key=HoodieKey 
{ recordKey=136021795 partitionPath=}, currentLocation='null', 
newLocation='null'}
   java.io.IOException: The file being written is in an invalid state. Probably 
caused by an error thrown previously. Current state: COLUMN
        at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:217)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:209)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:407)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:184)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:158)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:140)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310) 
~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.storage.HoodieBaseParquetWriter.write(HoodieBaseParquetWriter.java:80)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.storage.HoodieAvroParquetWriter.writeAvro(HoodieAvroParquetWriter.java:76)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.HoodieMergeHandle.writeToFile(HoodieMergeHandle.java:384) 
~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.HoodieMergeHandle.writeRecord(HoodieMergeHandle.java:313) 
~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:294)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:405) 
~[hudi-1.0.jar:?]
        at 
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:135)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:375)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:366)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:227)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:133)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.sink.compact.CompactOperator.processElement(CompactOperator.java:124)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756) 
~[hudi-1.0.jar:?]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 [hudi-1.0.jar:?]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
[hudi-1.0.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
[hudi-1.0.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
[hudi-1.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to