GitHub user ltuantai95 created a discussion: Can't read hudi format on hadoop 
that streaming from kafka

I have code as streaming data from kafka to hadoop with hudi format
```
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # good for local testing
t_env = StreamTableEnvironment.create(env)

BOOTSTRAP_SERVERS = "kafka:9092"  # Your Kafka bootstrap servers
ORDERS_TOPIC = "orders_topic"
GROUP_ID = "flink-consumer-group89" 

t_env.execute_sql(
    f"""
    CREATE TABLE orders_table (
        order_id INT,
        customer_id INT,
        product STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{ORDERS_TOPIC}',
        'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
        'properties.group.id' = '{GROUP_ID}',
        'format' = 'json',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true',
        'json.map-null-key.mode' = 'DROP',
        'scan.startup.mode' = 'earliest-offset'
    )
"""
)

t_env.execute_sql("""
CREATE TABLE hudi_sink (
    order_id BIGINT PRIMARY KEY NOT ENFORCED,
    customer_id INT,
    product STRING
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://192.168.16.40:8020/table1',
    'table.type' = 'COPY_ON_WRITE'
)
""")

# 4. Start continuous streaming insert
# This job will run forever until you cancel it (Ctrl+C or kill the Flink job)
t_env.execute_sql("""
INSERT INTO hudi_sink
SELECT * FROM orders_table
""")
```
I have code to read data with spark
```
from pyspark.sql import SparkSession

# Initialize Spark session with Hudi support
spark = SparkSession.builder \
    .appName("Read Hudi Table on HDFS") \
    .master("local[*]") \
    .config("spark.jars.packages", 
"org.apache.hudi:hudi-spark3.3-bundle_2.12:1.1.1") \
    .getOrCreate()

# Path to your Hudi table on HDFS
hudi_table_path = "hdfs://localhost:8020/table1"  

# Read the latest snapshot (default behavior)
hudi_df = spark.read \
    .format("hudi") \
    .load(hudi_table_path)

# Show some data
hudi_df.show(10, truncate=False)

# Or register as temp view for SQL
hudi_df.createOrReplaceTempView("my_hudi_table")
spark.sql("SELECT * FROM my_hudi_table LIMIT 10").show()
```
Job submit success and no error but no see parquets and data showed. 

Thanks for your help!

GitHub link: https://github.com/apache/hudi/discussions/17981

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to