GitHub user ltuantai95 closed a discussion: Can't read hudi format on hadoop
that streaming from kafka
I have code as streaming data from kafka to hadoop with hudi format
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1) # good for local testing
t_env = StreamTableEnvironment.create(env)
BOOTSTRAP_SERVERS = "kafka:9092" # Your Kafka bootstrap servers
ORDERS_TOPIC = "orders_topic"
GROUP_ID = "flink-consumer-group89"
t_env.execute_sql(
f"""
CREATE TABLE orders_table (
order_id INT,
customer_id INT,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = '{ORDERS_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{GROUP_ID}',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'scan.startup.mode' = 'earliest-offset'
)
"""
)
t_env.execute_sql("""
CREATE TABLE hudi_sink (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
customer_id INT,
product STRING
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://192.168.16.40:8020/table1',
'table.type' = 'COPY_ON_WRITE'
)
""")
# 4. Start continuous streaming insert
# This job will run forever until you cancel it (Ctrl+C or kill the Flink job)
t_env.execute_sql("""
INSERT INTO hudi_sink
SELECT * FROM orders_table
""")
```
I have code to read data with spark
```python
from pyspark.sql import SparkSession
# Initialize Spark session with Hudi support
spark = SparkSession.builder \
.appName("Read Hudi Table on HDFS") \
.master("local[*]") \
.config("spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:1.1.1") \
.getOrCreate()
# Path to your Hudi table on HDFS
hudi_table_path = "hdfs://localhost:8020/table1"
# Read the latest snapshot (default behavior)
hudi_df = spark.read \
.format("hudi") \
.load(hudi_table_path)
# Show some data
hudi_df.show(10, truncate=False)
# Or register as temp view for SQL
hudi_df.createOrReplaceTempView("my_hudi_table")
spark.sql("SELECT * FROM my_hudi_table LIMIT 10").show()
```
Job submit success and no error but no see parquets and data showed.
Thanks for your help!
GitHub link: https://github.com/apache/hudi/discussions/17981
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]