GitHub user ltuantai95 added a comment to the discussion: Can't write hudi
format in catalog
I enabled checkoint, source table (orders_dataden) does'nt have dataset.
If i don't use catalog and run code as below, it work OK:
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1) # good for local testing
env.enable_checkpointing(10000) # Checkpoint every 60 seconds
# Optional: more checkpoint settings (recommended)
# env.get_checkpoint_config().set_checkpoint_timeout(120000)
t_env = StreamTableEnvironment.create(env)
# 1. Create a streaming TableEnvironment
# 2. Create the infinite DataGen source table
t_env.execute_sql("""
CREATE TABLE orders_datagen (
order_number BIGINT,
price DECIMAL(10,2)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_number.kind' = 'sequence',
'fields.order_number.start' = '1',
'fields.order_number.end' = '10000',
'fields.price.kind' = 'random',
'fields.price.min' = '20.00',
'fields.price.max' = '10000.00'
)
""")
# 3. Create the Hudi sink table (streaming upsert supported)
# MOR (MERGE_ON_READ) is usually better for streaming; alternative is
COPY_ON_WRITE
t_env.execute_sql("""
CREATE TABLE hudi_sink (
order_number BIGINT PRIMARY KEY NOT ENFORCED,
price DECIMAL(10,2)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://namenode:8020/table1',
'table.type' = 'COPY_ON_WRITE'
)
""")
# 4. Start continuous streaming insert
# This job will run forever until you cancel it (Ctrl+C or kill the Flink job)
t_env.execute_sql("""
INSERT INTO hudi_sink
SELECT * FROM orders_datagen
""")
```
Maybe i config catalog wrong ?
GitHub link:
https://github.com/apache/hudi/discussions/17992#discussioncomment-15587875
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]