li-ang-666 opened a new issue, #9804:
URL: https://github.com/apache/hudi/issues/9804
[offline insert]
CREATE TABLE source_table(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts as CAST(NOW() AS TIMESTAMP(0)),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.99.131.246:3306/company_base',
'table-name' = 'ratio_path_company',
'username' = 'jdhw_d_data_dml',
'password' = '2s0^tFa4SLrp72',
'scan.partition.column' = 'id',
'scan.partition.num' = '40000',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '400000000',
'scan.fetch-size' = '1024'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'bucket',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.operation' = 'bulk_insert',
'write.bulk_insert.shuffle_input' = 'false',
'write.bulk_insert.sort_input' = 'false',
'write.tasks' = '128',
'write.precombine' = 'true',
'precombine.field' = 'op_ts',
-- compaction
'compaction.schedule.enabled' = 'false',
'compaction.async.enabled' = 'false',
-- clean
'clean.async.enabled' = 'false'
);
insert into ratio_path_company select * from source_table;
--------------------------------------
[online insert]
CREATE TABLE source_table (
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'e1d4c.json.prism_shareholder_path.ratio_path_company',
'properties.bootstrap.servers' =
'10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
'properties.group.id' = 'demo-job',
'scan.startup.mode' = 'earliest-offset',
-- canal
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.encode.decimal-as-plain-number' = 'true'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'bucket',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.tasks' = '8',
'write.task.max.size' = '512',
'write.batch.size' = '12',
'write.merge.max_memory' = '28',
'write.log_block.size' = '128',
'write.precombine' = 'true',
'precombine.field' = 'op_ts',
-- compaction
'compaction.tasks' = '8',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'false',
'compaction.max_memory' = '128',
'compaction.delta_commits' = '3',
-- clean
'clean.async.enabled' = 'true',
'clean.policy' = 'KEEP_LATEST_BY_HOURS',
'clean.retain_hours' = '72'
);
insert into ratio_path_company select * from source_table;
--------------------
got exception:
Caused by: java.lang.IllegalArgumentException: INT96 is deprecated. As
interim enable READ_INT96_AS_FIXED flag to read as byte array.
at
org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:316)
~[hudi-1.0.jar:?]
at
org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:298)
~[hudi-1.0.jar:?]
at
org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:341)
~[hudi-1.0.jar:?]
at
org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:297)
~[hudi-1.0.jar:?]
at
org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:275)
~[hudi-1.0.jar:?]
at
org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
~[hudi-1.0.jar:?]
at
org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:293)
~[hudi-1.0.jar:?]
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
~[hudi-1.0.jar:?]
at
org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:145)
~[hudi-1.0.jar:?]
at
org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:180)
~[hudi-1.0.jar:?]
at
org.apache.hudi.sink.compact.HoodieFlinkCompactor.main(HoodieFlinkCompactor.java:75)
~[hudi-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_302]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_302]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[hudi-1.0.jar:?]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[hudi-1.0.jar:?]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
~[hudi-1.0.jar:?]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
~[hudi-1.0.jar:?]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]