li-ang-666 opened a new issue, #9828:
URL: https://github.com/apache/hudi/issues/9828
the flink-sql bulk_insert is :
CREATE TABLE source_table(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
--op_ts as CAST(NOW() AS TIMESTAMP(0)),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxxx:3306/company_base',
'table-name' = 'ratio_path_company',
'username' = 'xxxxl',
'password' = 'xxxx',
'scan.partition.column' = 'id',
'scan.partition.num' = '40000',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '400000000',
'scan.fetch-size' = '1024'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
--op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company020',
'table.type' = 'MERGE_ON_READ',
-- index
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.operation' = 'bulk_insert',
'write.bulk_insert.shuffle_input' = 'false',
'write.bulk_insert.sort_input' = 'false',
'write.tasks' = '128'
);
insert into ratio_path_company select * from source_table;
-------------------------------------------------------------------------
while the flink-sql insert is:
CREATE TABLE source_table (
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
--op_ts TIMESTAMP(0) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'e1d4c.json.prism_shareholder_path.ratio_path_company',
'properties.bootstrap.servers' =
'10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
'properties.group.id' = 'demo-job',
'scan.startup.mode' = 'earliest-offset',
-- canal
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.encode.decimal-as-plain-number' = 'true'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
--op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company020',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.tasks' = '8',
'write.task.max.size' = '512',
'write.batch.size' = '12',
'write.merge.max_memory' = '28',
'write.log_block.size' = '128',
--'write.precombine' = 'true',
--'precombine.field' = 'op_ts',
-- compaction
'compaction.tasks' = '8',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'true',
'compaction.max_memory' = '128',
'compaction.delta_commits' = '2',
-- clean
'clean.async.enabled' = 'true',
'clean.policy' = 'KEEP_LATEST_BY_HOURS',
'clean.retain_hours' = '72'
);
insert into ratio_path_company select * from source_table;
-----------------------------------------------------------------
the exception is:
2023-10-07 10:42:42.364 [ERROR] [consumer-thread-1]
(BoundedInMemoryExecutor.java:139) - error consuming records
org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_302]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_302]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_302]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
Caused by: org.apache.hudi.exception.HoodieException: unable to read next
record from parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_302]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_302]
... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 1 in block 0 in file
obs://hadoop-obs/ods_hudi/ratio_path_company020/00000232-f23e-46d6-a23a-13781e068df8_104-128-0_20231007102621349.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_302]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_302]
... 4 more
Caused by: java.lang.UnsupportedOperationException:
org.apache.parquet.avro.AvroConverters$FieldLongConverter
at
org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
~[blob_p-c9a468f216136ebc900a58b5e92312a63db71da8-ff5767888ef648ae69e4f572158977b8:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_302]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_302]
... 4 more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]