li-ang-666 opened a new issue, #9832:
URL: https://github.com/apache/hudi/issues/9832
the bulk_insert is:
CREATE TABLE source_table(
id DECIMAL(20,0),
graph_id BIGINT,
base STRING,
name STRING,
legal_person_id BIGINT,
legal_person_name STRING,
legal_person_type INT,
code STRING,
reg_number STRING,
company_org_type STRING,
reg_location STRING,
establish_date DATE,
from_date DATE,
to_date DATE,
business_scope STRING,
reg_institute STRING,
approved_date DATE,
reg_status STRING,
reg_capital STRING,
org_number STRING,
source_flag STRING,
crawled_time TIMESTAMP(3),
deleted TINYINT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
op_ts as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' =
'jdbc:mysql://xxxxx:3306/prism?useSSL=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false',
'table-name' = 'enterprise',
'username' = 'xxxxx',
'password' = '2s0^xxxxx',
'scan.partition.column' = 'id',
'scan.partition.num' = '300000',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '3000000000',
'scan.fetch-size' = '1024'
);
create table enterprise(
id DECIMAL(20,0),
graph_id BIGINT,
base STRING,
name STRING,
legal_person_id BIGINT,
legal_person_name STRING,
legal_person_type INT,
code STRING,
reg_number STRING,
company_org_type STRING,
reg_location STRING,
establish_date DATE,
from_date DATE,
to_date DATE,
business_scope STRING,
reg_institute STRING,
approved_date DATE,
reg_status STRING,
reg_capital STRING,
org_number STRING,
source_flag STRING,
crawled_time TIMESTAMP(3),
deleted TINYINT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
op_ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/hudi_ods/enterprise011',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '128',
-- write
'write.operation' = 'bulk_insert',
'write.bulk_insert.shuffle_input' = 'false',
'write.bulk_insert.sort_input' = 'false',
'write.tasks' = '64',
'write.precombine' = 'true',
'write.precombine.field' = 'op_ts',
-- compaction
'compaction.schedule.enabled' = 'false',
'compaction.async.enabled' = 'false',
-- clean
'clean.async.enabled' = 'false'
);
insert into enterprise select * from source_table;
-----------------------------------------------------
the stream insert is:
CREATE TABLE source_table (
id DECIMAL(20,0),
graph_id BIGINT,
base STRING,
name STRING,
legal_person_id BIGINT,
legal_person_name STRING,
legal_person_type INT,
code STRING,
reg_number STRING,
company_org_type STRING,
reg_location STRING,
establish_date DATE,
from_date DATE,
to_date DATE,
business_scope STRING,
reg_institute STRING,
approved_date DATE,
reg_status STRING,
reg_capital STRING,
org_number STRING,
source_flag STRING,
crawled_time TIMESTAMP(3),
deleted TINYINT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = '56736.json.prism.enterprise',
'properties.bootstrap.servers' =
'10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
'properties.group.id' = 'hudi-demo-job',
'scan.startup.mode' = 'earliest-offset',
-- canal
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.encode.decimal-as-plain-number' = 'true'
);
create table enterprise(
id DECIMAL(20,0),
graph_id BIGINT,
base STRING,
name STRING,
legal_person_id BIGINT,
legal_person_name STRING,
legal_person_type INT,
code STRING,
reg_number STRING,
company_org_type STRING,
reg_location STRING,
establish_date DATE,
from_date DATE,
to_date DATE,
business_scope STRING,
reg_institute STRING,
approved_date DATE,
reg_status STRING,
reg_capital STRING,
org_number STRING,
source_flag STRING,
crawled_time TIMESTAMP(3),
deleted TINYINT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
op_ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/hudi_ods/enterprise011',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '128',
-- write
'write.tasks' = '2',
'write.task.max.size' = '512',
'write.batch.size' = '4',
'write.log_block.size' = '8',
'write.precombine' = 'true',
'write.precombine.field' = 'op_ts',
-- compaction
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'false',
'compaction.delta_commits' = '3',
-- clean
'clean.async.enabled' = 'true',
'clean.retain_commits' = '2880'
);
insert into enterprise select * from source_table;
--------------------------------------------------------------
got exception when compaction:
2023-10-08 21:45:03.232 [ERROR] [compact_task (31/128)#0]
(HoodieMergeHandle.java:325) - Error writing record HoodieRecord{key=HoodieKey
{ recordKey=136021795 partitionPath=}, currentLocation='null',
newLocation='null'}
java.io.IOException: The file being written is in an invalid state. Probably
caused by an error thrown previously. Current state: COLUMN
at
org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:217)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:209)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:407)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:184)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:158)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:140)
~[hudi-1.0.jar:?]
at
org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.storage.HoodieBaseParquetWriter.write(HoodieBaseParquetWriter.java:80)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.storage.HoodieAvroParquetWriter.writeAvro(HoodieAvroParquetWriter.java:76)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.HoodieMergeHandle.writeToFile(HoodieMergeHandle.java:384)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.HoodieMergeHandle.writeRecord(HoodieMergeHandle.java:313)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:294)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
~[hudi-1.0.jar:?]
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:405)
~[hudi-1.0.jar:?]
at
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:135)
~[hudi-1.0.jar:?]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:375)
~[hudi-1.0.jar:?]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:366)
~[hudi-1.0.jar:?]
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:227)
~[hudi-1.0.jar:?]
at
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:133)
~[hudi-1.0.jar:?]
at
org.apache.hudi.sink.compact.CompactOperator.processElement(CompactOperator.java:124)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
~[hudi-1.0.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
~[hudi-1.0.jar:?]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
[hudi-1.0.jar:?]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
[hudi-1.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
[hudi-1.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
[hudi-1.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]