soumilshah1995 opened a new issue, #11105:
URL: https://github.com/apache/hudi/issues/11105

   Hello,
   
   I'm currently working on creating comprehensive content for our community, 
focusing on integrating Apache Flink (version 1.17.1) with Python (3.8.19). 
While building the setup, I've encountered an issue that requires assistance 
from the community.
   
   Setup:
   
   Apache Flink version: 1.17.1
   Python version: 3.8.19
   Steps to Reproduce:
   
   I've provided a Docker Compose file along with the necessary configurations 
to replicate the setup. Below are the essential components included in the 
Docker Compose file:
   ```
   version: "3"
   
   services:
     mysql:
       image: quay.io/debezium/example-mysql:2.1
       container_name: mysql
       ports:
         - "3306:3306"
       environment:
         MYSQL_ROOT_PASSWORD: debezium
         MYSQL_USER: mysqluser
         MYSQL_PASSWORD: mysqlpw
       restart: always
   
     fast-data-dev:
       image: dougdonohoe/fast-data-dev
       ports:
         - "3181:3181"
         - "3040:3040"
         - "7081:7081"
         - "7082:7082"
         - "7083:7083"
         - "7092:7092"
         - "8081:8081"
       environment:
         - ZK_PORT=3181
         - WEB_PORT=3040
         - REGISTRY_PORT=8081
         - REST_PORT=7082
         - CONNECT_PORT=7083
         - BROKER_PORT=7092
         - ADV_HOST=127.0.0.1
   
     trino-coordinator:
       image: 'trinodb/trino:latest'
       hostname: trino-coordinator
       ports:
         - '8080:8080'
       volumes:
         - ./trino/etc:/etc/trino
   
     metastore_db:
       image: postgres:11
       hostname: metastore_db
       ports:
         - 5432:5432
       environment:
         POSTGRES_USER: hive
         POSTGRES_PASSWORD: hive
         POSTGRES_DB: metastore
       command: ["postgres", "-c", "wal_level=logical"]
       healthcheck:
         test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
         interval: 10s
         timeout: 5s
         retries: 5
       volumes:
         - ./postgresscripts:/docker-entrypoint-initdb.d
   
     hive-metastore:
       hostname: hive-metastore
       image: 'starburstdata/hive:3.1.2-e.18'
       ports:
         - '9083:9083' # Metastore Thrift
       environment:
         HIVE_METASTORE_DRIVER: org.postgresql.Driver
         HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
         HIVE_METASTORE_USER: hive
         HIVE_METASTORE_PASSWORD: hive
         HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
         S3_ENDPOINT: http://minio:9000
         S3_ACCESS_KEY: admin
         S3_SECRET_KEY: password
         S3_PATH_STYLE_ACCESS: "true"
         REGION: ""
         GOOGLE_CLOUD_KEY_FILE_PATH: ""
         AZURE_ADL_CLIENT_ID: ""
         AZURE_ADL_CREDENTIAL: ""
         AZURE_ADL_REFRESH_URL: ""
         AZURE_ABFS_STORAGE_ACCOUNT: ""
         AZURE_ABFS_ACCESS_KEY: ""
         AZURE_WASB_STORAGE_ACCOUNT: ""
         AZURE_ABFS_OAUTH: ""
         AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
         AZURE_ABFS_OAUTH_CLIENT_ID: ""
         AZURE_ABFS_OAUTH_SECRET: ""
         AZURE_ABFS_OAUTH_ENDPOINT: ""
         AZURE_WASB_ACCESS_KEY: ""
         HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
       depends_on:
         - metastore_db
       healthcheck:
         test: bash -c "exec 6<> /dev/tcp/localhost/9083"
   
     minio:
       image: minio/minio
       environment:
         - MINIO_ROOT_USER=admin
         - MINIO_ROOT_PASSWORD=password
         - MINIO_DOMAIN=minio
       networks:
         default:
           aliases:
             - warehouse.minio
       ports:
         - 9001:9001
         - 9000:9000
       command: ["server", "/data", "--console-address", ":9001"]
   
     mc:
       depends_on:
         - minio
       image: minio/mc
       environment:
         - AWS_ACCESS_KEY_ID=admin
         - AWS_SECRET_ACCESS_KEY=password
         - AWS_REGION=us-east-1
       entrypoint: >
         /bin/sh -c "
         until (/usr/bin/mc config host add minio http://minio:9000 admin 
password) do echo '...waiting...' && sleep 1; done;
         /usr/bin/mc rm -r --force minio/warehouse;
         /usr/bin/mc mb minio/warehouse;
         /usr/bin/mc policy set public minio/warehouse;
         tail -f /dev/null
         "
   
   
   
   volumes:
     hive-metastore-postgresql:
   
   networks:
     default:
       name: hudi
   ```
   
   Issue:
   
   The problem arises when I include the following three parameters related to 
Hive sync in my Flink setup:
   
   ```
   'hive_sync.enable' = 'true',
   'hive_sync.mode' = 'hms',
   'hive_sync.metastore.uris' = 'thrift://localhost:9083'
   
   ```
   
   With these parameters included, I'm unable to insert records into the 
system. However, when I remove these parameters, the system functions properly, 
allowing record insertion.
   
   Additional Context:
   
   This setup has been previously tested for Delta Streamer, as documented 
[here](https://github.com/soumilshah1995/universal-datalakehouse-postgres-ingestion-deltastreamer).
 The goal is to achieve similar functionality with Apache Flink instead of 
Spark.
   
   
   Code Snippet:
   
   I've provided a full code snippet demonstrating the setup and execution 
process. This includes defining the Hudi table, executing SQL queries, and 
attempting to insert records into the Hudi table.
   
   
   
   ```
   import os
   os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'
   
   from pyflink.table import EnvironmentSettings, TableEnvironment
   import os
   
   # Create a batch TableEnvironment
   env_settings = EnvironmentSettings.in_batch_mode()
   table_env = TableEnvironment.create(env_settings)
   CURRENT_DIR = os.getcwd()
   
   # Define a list of JAR file names you want to add
   jar_files = [
       "jar/flink-s3-fs-hadoop-1.17.1.jar",
       "jar/hudi-flink1.17-bundle-0.14.0.jar"
   ]
   
   jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
   
   table_env.get_config().get_configuration().set_string(
       "pipeline.jars",
       ";".join(jar_urls)
   )
   
   hudi_output_path = 
'file:////Users/soumilshah/Desktop/my-flink-environment/hudi/'
   
   hudi_sink = f"""
   CREATE TABLE hudi_table(
       ts BIGINT,
       uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
       rider VARCHAR(20),
       driver VARCHAR(20),
       fare DOUBLE,
       city VARCHAR(20)
   )
   PARTITIONED BY (`city`)
   WITH (
       'connector' = 'hudi',
       'path' = '{hudi_output_path}' ,
       'table.type' = 'COPY_ON_WRITE' ,
       'hive_sync.enable' = 'true',
       'hive_sync.mode' = 'hms',
       'hive_sync.metastore.uris' = 'thrift://localhost:9083'
   );
   """
   
   # Execute the SQL to create the Hudi table
   table_env.execute_sql(hudi_sink).wait()
   
   # Define the SQL query to select data from the Hudi table source
   query = """
   INSERT INTO hudi_table
   VALUES
   
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
   
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
 ,'san_francisco'),
   
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
 ,'san_francisco'),
   
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
   
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
   
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
 ,'sao_paulo'),
   
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
 ,'chennai'),
   
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
   """
   
   table_env.execute_sql(query)
   
   # Define the SQL query to select data from the Hudi table source
   query = """
   SELECT * FROM hudi_table where  uuid = 
'334e26e9-8355-45cc-97c6-c31daf0df330';
   """
   
   table_env.execute_sql(query).print()
   
   ```
   
   Request for Assistance:
   
   I'm seeking assistance from the community to understand how to properly 
configure Hive sync parameters in my Flink setup. Any guidance or insights 
would be greatly appreciated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to