voonhous commented on issue #9256:
URL: https://github.com/apache/hudi/issues/9256#issuecomment-1647280925

   @jlloh I'm able to write and newly added column with such an operational 
logic:
   
   # Create a Hudi-on-Flink append only job
   
   ```sql
   CREATE TABLE input_table (
       `val`               STRING
       ,`event_time`       TIMESTAMP(3)
       ,`partition`        BIGINT
       ,`offset`           BIGINT
       ,`map_col`          MAP<STRING, INT>
   ) WITH (
       'connector' = 'datagen',
       'fields.val.length' = '99',
       'rows-per-second' = '50'
   );
   
   CREATE TABLE wt_test_hudi
   (
       `val`               STRING
       ,`event_time`       TIMESTAMP(3)
       ,`partition`        BIGINT
       ,`offset`           BIGINT
       ,`map_col`          MAP<STRING, INT>
       ,`dt`               STRING
   ) PARTITIONED BY (dt)
   WITH (
       'connector' = 'hudi',
       'path' = 'hdfs://my/path/to/hfse_flink_writes',
       'table.type' = 'COPY_ON_WRITE',
       'write.operation' = 'insert',
       'hoodie.parquet.small.file.limit' = '104857600',
       'hoodie.parquet.max.file.size' = '268435456',
       'hoodie.datasource.write.recordkey.field' = 'val',
       'hoodie.datasource.write.hive_style_partitioning' = 'true',
       'hoodie.datasource.write.partitionpath.field' = 'dt',
       'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
       'write.tasks' = '2',
   
       'clustering.schedule.enabled' = 'true',
       'clustering.async.enabled' = 'true',
       
       'clustering.plan.strategy.small.file.limit' = '1800', -- 1800MB
       'clustering.plan.strategy.target.file.max.bytes' = '2147483648', -- 2GB
       'clustering.tasks' = '5',
       'clustering.delta_commits' = '5',
       'hoodie.parquet.compression.ratio' = '0.1',
   
       'hive_sync.enable'='true',
       'hive_sync.metastore.uris' = 'thrift://hive.metastore.io:9083',
       'hive_sync.use_jdbc' = 'false',
       'hive_sync.mode' = 'hms',
       'hive_sync.db' = 'dev_hudi',
       'hive_sync.table' = 'hfse_flink_writes'
   );
   
   insert into wt_test_hudi
   select  `val`
           ,`event_time`
           ,`partition`
           ,`offset`
           , `map_col`
           , `array_col`
           ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
    from input_table;
   ```
   
   # Stop the flink job (with or without savepoint) + alter table on spark
   On a spark session, update the table schema
   
   ```sql
   ALTER TABLE dev_hudi.hfse_flink_writes ADD COLUMN (`array_col` ARRAY<INT> 
AFTER `map_col`);
   ```
   
   This will add a the column `array_col` after `map_col`.
   
   # Update Hudi-on-Flink append-only job with the new schema
   
   Update the Flink-SQL and start the job again.
   
   ```sql
   CREATE TABLE input_table (
       `val`               STRING
       ,`event_time`       TIMESTAMP(3)
       ,`partition`        BIGINT
       ,`offset`           BIGINT
       ,`map_col`          MAP<STRING, INT>
       ,`array_col`          ARRAY< INT>
   ) WITH (
       'connector' = 'datagen',
       'fields.val.length' = '99',
       'rows-per-second' = '50'
   );
   
   CREATE TABLE wt_test_hudi
   (
       `val`               STRING
       ,`event_time`       TIMESTAMP(3)
       ,`partition`        BIGINT
       ,`offset`           BIGINT
       ,`map_col`          MAP<STRING, INT>
       ,`array_col`          ARRAY< INT>
       ,`dt`               STRING
   ) PARTITIONED BY (dt)
   WITH (
       'connector' = 'hudi',
       'path' = 'hdfs://my/path/to/hfse_flink_writes',
       'table.type' = 'COPY_ON_WRITE',
       'write.operation' = 'insert',
       'hoodie.parquet.small.file.limit' = '104857600',
       'hoodie.parquet.max.file.size' = '268435456',
       'hoodie.datasource.write.recordkey.field' = 'val',
       'hoodie.datasource.write.hive_style_partitioning' = 'true',
       'hoodie.datasource.write.partitionpath.field' = 'dt',
       'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
       'write.tasks' = '2',
   
       'clustering.schedule.enabled' = 'true',
       'clustering.async.enabled' = 'true',
       
       'clustering.plan.strategy.small.file.limit' = '1800', -- 1800MB
       'clustering.plan.strategy.target.file.max.bytes' = '2147483648', -- 2GB
       'clustering.tasks' = '5',
       'clustering.delta_commits' = '5',
       'hoodie.parquet.compression.ratio' = '0.1',
   
       'hive_sync.enable'='true',
       'hive_sync.metastore.uris' = 'thrift://hive.metastore.io:9083',
       'hive_sync.use_jdbc' = 'false',
       'hive_sync.mode' = 'hms',
       'hive_sync.db' = 'dev_hudi',
       'hive_sync.table' = 'hfse_flink_writes'
   );
   
   insert into wt_test_hudi
   select  `val`
           ,`event_time`
           ,`partition`
           ,`offset`
           , `map_col`
           , `array_col`
           ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
    from input_table;
   ```
   
   # Query the table results on Spark
   ```sql
   REFRESH TABLE dev_hudi.hfse_flink_writes;
   SELECT * FROM dev_hudi.hfse_flink_writes WHERE `array_col` is not null;
   ```
   
   # PS
   Can you please help to check if the configs below contain the new column 
spec definition that was added via Spark?
   
   1. getOrderedColumnExpr()
   2. AppConfig.output.hudiSink.targetTable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to