soumilshah1995 opened a new issue, #11105:
URL: https://github.com/apache/hudi/issues/11105
Hello,
I'm currently working on creating comprehensive content for our community,
focusing on integrating Apache Flink (version 1.17.1) with Python (3.8.19).
While building the setup, I've encountered an issue that requires assistance
from the community.
Setup:
Apache Flink version: 1.17.1
Python version: 3.8.19
Steps to Reproduce:
I've provided a Docker Compose file along with the necessary configurations
to replicate the setup. Below are the essential components included in the
Docker Compose file:
```
version: "3"
services:
mysql:
image: quay.io/debezium/example-mysql:2.1
container_name: mysql
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
restart: always
fast-data-dev:
image: dougdonohoe/fast-data-dev
ports:
- "3181:3181"
- "3040:3040"
- "7081:7081"
- "7082:7082"
- "7083:7083"
- "7092:7092"
- "8081:8081"
environment:
- ZK_PORT=3181
- WEB_PORT=3040
- REGISTRY_PORT=8081
- REST_PORT=7082
- CONNECT_PORT=7083
- BROKER_PORT=7092
- ADV_HOST=127.0.0.1
trino-coordinator:
image: 'trinodb/trino:latest'
hostname: trino-coordinator
ports:
- '8080:8080'
volumes:
- ./trino/etc:/etc/trino
metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5432:5432
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
command: ["postgres", "-c", "wal_level=logical"]
healthcheck:
test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
interval: 10s
timeout: 5s
retries: 5
volumes:
- ./postgresscripts:/docker-entrypoint-initdb.d
hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"
minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
volumes:
hive-metastore-postgresql:
networks:
default:
name: hudi
```
Issue:
The problem arises when I include the following three parameters related to
Hive sync in my Flink setup:
```
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://localhost:9083'
```
With these parameters included, I'm unable to insert records into the
system. However, when I remove these parameters, the system functions properly,
allowing record insertion.
Additional Context:
This setup has been previously tested for Delta Streamer, as documented
[here](https://github.com/soumilshah1995/universal-datalakehouse-postgres-ingestion-deltastreamer).
The goal is to achieve similar functionality with Apache Flink instead of
Spark.
Code Snippet:
I've provided a full code snippet demonstrating the setup and execution
process. This includes defining the Hudi table, executing SQL queries, and
attempting to insert records into the Hudi table.
```
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"jar/flink-s3-fs-hadoop-1.17.1.jar",
"jar/hudi-flink1.17-bundle-0.14.0.jar"
]
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
hudi_output_path =
'file:////Users/soumilshah/Desktop/my-flink-environment/hudi/'
hudi_sink = f"""
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'COPY_ON_WRITE' ,
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://localhost:9083'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink).wait()
# Define the SQL query to select data from the Hudi table source
query = """
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
"""
table_env.execute_sql(query)
# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table where uuid =
'334e26e9-8355-45cc-97c6-c31daf0df330';
"""
table_env.execute_sql(query).print()
```
Request for Assistance:
I'm seeking assistance from the community to understand how to properly
configure Hive sync parameters in my Flink setup. Any guidance or insights
would be greatly appreciated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]