li-ang-666 opened a new issue, #9804:
URL: https://github.com/apache/hudi/issues/9804

   [offline insert]
   CREATE TABLE source_table(
     id                         DECIMAL(20, 0),
     company_id                 BIGINT,
     shareholder_id             STRING,
     shareholder_entity_type    SMALLINT,
     shareholder_name_id        BIGINT,
     investment_ratio_total     DECIMAL(24, 12),
     is_controller              SMALLINT,
     is_ultimate                SMALLINT,
     is_big_shareholder         SMALLINT,
     is_controlling_shareholder SMALLINT,
     equity_holding_path        STRING,
     create_time                TIMESTAMP(0),
     update_time                TIMESTAMP(0),
     is_deleted                 SMALLINT,
     op_ts as CAST(NOW() AS TIMESTAMP(0)),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://10.99.131.246:3306/company_base',
     'table-name' = 'ratio_path_company',
     'username' = 'jdhw_d_data_dml',
     'password' = '2s0^tFa4SLrp72',
     'scan.partition.column' = 'id',
     'scan.partition.num' = '40000',
     'scan.partition.lower-bound' = '1',
     'scan.partition.upper-bound' = '400000000',
     'scan.fetch-size' = '1024'
   );
   create table ratio_path_company(
     id                         DECIMAL(20, 0),
     company_id                 BIGINT,
     shareholder_id             STRING,
     shareholder_entity_type    SMALLINT,
     shareholder_name_id        BIGINT,
     investment_ratio_total     DECIMAL(24, 12),
     is_controller              SMALLINT,
     is_ultimate                SMALLINT,
     is_big_shareholder         SMALLINT,
     is_controlling_shareholder SMALLINT,
     equity_holding_path        STRING,
     create_time                TIMESTAMP(0),
     update_time                TIMESTAMP(0),
     is_deleted                 SMALLINT,
     op_ts                      TIMESTAMP(0),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'hudi',
     'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
     'table.type' = 'MERGE_ON_READ',
     -- cdc
     'changelog.enabled' = 'true',
     -- index
     'index.type' = 'bucket',
     'hoodie.bucket.index.num.buckets' = '256',
     -- write
     'write.operation' = 'bulk_insert',
     'write.bulk_insert.shuffle_input' = 'false',
     'write.bulk_insert.sort_input' = 'false',
     'write.tasks' = '128',
     'write.precombine' = 'true',
     'precombine.field' = 'op_ts',
     -- compaction
     'compaction.schedule.enabled' = 'false',
     'compaction.async.enabled' = 'false',
     -- clean
     'clean.async.enabled' = 'false'
   );
   insert into ratio_path_company select * from source_table;
   --------------------------------------
   [online insert]
   CREATE TABLE source_table (
     id                         DECIMAL(20, 0),
     company_id                 BIGINT,
     shareholder_id             STRING,
     shareholder_entity_type    SMALLINT,
     shareholder_name_id        BIGINT,
     investment_ratio_total     DECIMAL(24, 12),
     is_controller              SMALLINT,
     is_ultimate                SMALLINT,
     is_big_shareholder         SMALLINT,
     is_controlling_shareholder SMALLINT,
     equity_holding_path        STRING,
     create_time                TIMESTAMP(0),
     update_time                TIMESTAMP(0),
     is_deleted                 SMALLINT,
     op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'kafka',
     'topic' = 'e1d4c.json.prism_shareholder_path.ratio_path_company',
     'properties.bootstrap.servers' = 
'10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
     'properties.group.id' = 'demo-job',
     'scan.startup.mode' = 'earliest-offset',
     -- canal
     'format' = 'canal-json',
     'canal-json.ignore-parse-errors' = 'true',
     'canal-json.encode.decimal-as-plain-number' = 'true'
   );
   create table ratio_path_company(
     id                         DECIMAL(20, 0),
     company_id                 BIGINT,
     shareholder_id             STRING,
     shareholder_entity_type    SMALLINT,
     shareholder_name_id        BIGINT,
     investment_ratio_total     DECIMAL(24, 12),
     is_controller              SMALLINT,
     is_ultimate                SMALLINT,
     is_big_shareholder         SMALLINT,
     is_controlling_shareholder SMALLINT,
     equity_holding_path        STRING,
     create_time                TIMESTAMP(0),
     update_time                TIMESTAMP(0),
     is_deleted                 SMALLINT,
     op_ts                      TIMESTAMP(0),
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'hudi',
     'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
     'table.type' = 'MERGE_ON_READ',
     -- cdc
     'changelog.enabled' = 'true',
     -- index
     'index.type' = 'bucket', 
     'hoodie.bucket.index.num.buckets' = '256',
     -- write
     'write.tasks' = '8',
     'write.task.max.size' = '512',
     'write.batch.size' = '12',
     'write.merge.max_memory' = '28',
     'write.log_block.size' = '128', 
     'write.precombine' = 'true',
     'precombine.field' = 'op_ts',
     -- compaction
     'compaction.tasks' = '8',
     'compaction.schedule.enabled' = 'true',
     'compaction.async.enabled' = 'false',
     'compaction.max_memory' = '128',
     'compaction.delta_commits' = '3',
     -- clean
     'clean.async.enabled' = 'true',
     'clean.policy' = 'KEEP_LATEST_BY_HOURS',
     'clean.retain_hours' = '72'
   );
   insert into ratio_path_company select * from source_table;
   --------------------
   got exception:
   Caused by: java.lang.IllegalArgumentException: INT96 is deprecated. As 
interim enable READ_INT96_AS_FIXED  flag to read as byte array.
        at 
org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:316)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:298)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:341)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:297)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:275)
 ~[hudi-1.0.jar:?]
        at 
org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:293)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:145) 
~[hudi-1.0.jar:?]
        at 
org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.<init>(HoodieFlinkCompactor.java:180)
 ~[hudi-1.0.jar:?]
        at 
org.apache.hudi.sink.compact.HoodieFlinkCompactor.main(HoodieFlinkCompactor.java:75)
 ~[hudi-1.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_302]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_302]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_302]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[hudi-1.0.jar:?]
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
~[hudi-1.0.jar:?]
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
 ~[hudi-1.0.jar:?]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to