voonhous commented on issue #9256:
URL: https://github.com/apache/hudi/issues/9256#issuecomment-1647280925
@jlloh I'm able to write and newly added column with such an operational
logic:
# Create a Hudi-on-Flink append only job
```sql
CREATE TABLE input_table (
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`map_col` MAP<STRING, INT>
) WITH (
'connector' = 'datagen',
'fields.val.length' = '99',
'rows-per-second' = '50'
);
CREATE TABLE wt_test_hudi
(
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`map_col` MAP<STRING, INT>
,`dt` STRING
) PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://my/path/to/hfse_flink_writes',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert',
'hoodie.parquet.small.file.limit' = '104857600',
'hoodie.parquet.max.file.size' = '268435456',
'hoodie.datasource.write.recordkey.field' = 'val',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.partitionpath.field' = 'dt',
'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'write.tasks' = '2',
'clustering.schedule.enabled' = 'true',
'clustering.async.enabled' = 'true',
'clustering.plan.strategy.small.file.limit' = '1800', -- 1800MB
'clustering.plan.strategy.target.file.max.bytes' = '2147483648', -- 2GB
'clustering.tasks' = '5',
'clustering.delta_commits' = '5',
'hoodie.parquet.compression.ratio' = '0.1',
'hive_sync.enable'='true',
'hive_sync.metastore.uris' = 'thrift://hive.metastore.io:9083',
'hive_sync.use_jdbc' = 'false',
'hive_sync.mode' = 'hms',
'hive_sync.db' = 'dev_hudi',
'hive_sync.table' = 'hfse_flink_writes'
);
insert into wt_test_hudi
select `val`
,`event_time`
,`partition`
,`offset`
, `map_col`
, `array_col`
,DATE_FORMAT(event_time, 'yyyy-MM-dd')
from input_table;
```
# Stop the flink job (with or without savepoint) + alter table on spark
On a spark session, update the table schema
```sql
ALTER TABLE dev_hudi.hfse_flink_writes ADD COLUMN (`array_col` ARRAY<INT>
AFTER `map_col`);
```
This will add a the column `array_col` after `map_col`.
# Update Hudi-on-Flink append-only job with the new schema
Update the Flink-SQL and start the job again.
```sql
CREATE TABLE input_table (
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`map_col` MAP<STRING, INT>
,`array_col` ARRAY< INT>
) WITH (
'connector' = 'datagen',
'fields.val.length' = '99',
'rows-per-second' = '50'
);
CREATE TABLE wt_test_hudi
(
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`map_col` MAP<STRING, INT>
,`array_col` ARRAY< INT>
,`dt` STRING
) PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://my/path/to/hfse_flink_writes',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert',
'hoodie.parquet.small.file.limit' = '104857600',
'hoodie.parquet.max.file.size' = '268435456',
'hoodie.datasource.write.recordkey.field' = 'val',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.partitionpath.field' = 'dt',
'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'write.tasks' = '2',
'clustering.schedule.enabled' = 'true',
'clustering.async.enabled' = 'true',
'clustering.plan.strategy.small.file.limit' = '1800', -- 1800MB
'clustering.plan.strategy.target.file.max.bytes' = '2147483648', -- 2GB
'clustering.tasks' = '5',
'clustering.delta_commits' = '5',
'hoodie.parquet.compression.ratio' = '0.1',
'hive_sync.enable'='true',
'hive_sync.metastore.uris' = 'thrift://hive.metastore.io:9083',
'hive_sync.use_jdbc' = 'false',
'hive_sync.mode' = 'hms',
'hive_sync.db' = 'dev_hudi',
'hive_sync.table' = 'hfse_flink_writes'
);
insert into wt_test_hudi
select `val`
,`event_time`
,`partition`
,`offset`
, `map_col`
, `array_col`
,DATE_FORMAT(event_time, 'yyyy-MM-dd')
from input_table;
```
# Query the table results on Spark
```sql
REFRESH TABLE dev_hudi.hfse_flink_writes;
SELECT * FROM dev_hudi.hfse_flink_writes WHERE `array_col` is not null;
```
# PS
Can you please help to check if the configs below contain the new column
spec definition that was added via Spark?
1. getOrderedColumnExpr()
2. AppConfig.output.hudiSink.targetTable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]