soumilshah1995 opened a new issue, #9777:
URL: https://github.com/apache/hudi/issues/9777
```
Mac
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Windows
version: "3.7"
services:
postgres:
image: postgres:13 # Use a compatible x86_64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
```
Step 2: Create Table on Postgres
```
-- Create a table named 'shipments' with the following columns:
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY, -- Auto-incremented shipment ID,
primary key
order_id SERIAL NOT NULL, -- Auto-incremented order ID, not null
origin VARCHAR(255) NOT NULL, -- Origin location, not null
destination VARCHAR(255) NOT NULL, -- Destination location, not null
is_arrived BOOLEAN NOT NULL -- Boolean indicating if the shipment has
arrived, not null
);
-- Reset the sequence for the 'shipment_id' column to start from 1001
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
-- Set the REPLICA IDENTITY for the 'shipments' table to FULL, which allows
replica tables to store full row values.
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
-- Insert three records into the 'shipments' table with default values for
'shipment_id', and specific values for other columns.
INSERT INTO shipments
VALUES (default, 10001, 'Beijing', 'Shanghai', false), -- Insert shipment
from Beijing to Shanghai, not arrived
(default, 10002, 'Hangzhou', 'Shanghai', false), -- Insert shipment
from Hangzhou to Shanghai, not arrived
(default, 10003, 'Shanghai', 'Hangzhou', false); -- Insert shipment
from Shanghai to Hangzhou, not arrived
ALTER SYSTEM SET wal_level = 'logical';
```
# SET access keys
import os
os.environ['AWS_ACCESS_KEY_ID'] = "XX"
os.environ['AWS_ACCESS_KEY'] = "XX"
os.environ['AWS_SECRET_ACCESS_KEY'] = "XX"
os.environ['AWS_SECRET_KEY'] = "XX"
```
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
# "flink-s3-fs-hadoop-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.0.jar",
"hudi-flink1.16-bundle-0.13.0.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
```
Create Source
```
postgres_sink = f"""
CREATE TABLE shipments_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'sales',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'shipments'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(postgres_sink)
```
Hudi Sink
```
hudi_output_path = 's3a://datateam-sandbox-qa-demo/tmp4/shipments/'
hudi_sink = f"""
CREATE TABLE shipments_hudi_sink (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
)
PARTITIONED BY (`destination`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'MERGE_ON_READ'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
```
# Read from Source and Write into SINK
```
# Define the data to be inserted into the Hudi table
table_env.execute_sql("""
INSERT INTO shipments_hudi_sink
SELECT * FROM shipments_source
""").wait()
```
Logs
```
2023-09-23 15:59:46,371 WARN org.apache.hadoop.metrics2.impl.MetricsConfig
[] - Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2023-09-23 15:59:46,379 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already
exists.
2023-09-23 15:59:46,380 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled
Metric snapshot period at 10 second(s).
2023-09-23 15:59:46,380 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] -
s3a-file-system metrics system started
2023-09-23 15:59:46,383 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance
already exists.
2023-09-23 15:59:46,384 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo":
Instance already exists.
2023-09-23 15:59:55,354 WARN org.apache.hadoop.metrics2.impl.MetricsConfig
[] - Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2023-09-23 15:59:55,426 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already
exists.
2023-09-23 16:00:00,433 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled
Metric snapshot period at 10 second(s).
2023-09-23 16:00:00,434 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] -
s3a-file-system metrics system started
2023-09-23 16:00:00,438 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance
already exists.
2023-09-23 16:00:00,445 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo":
Instance already exists.
2023-09-23 16:00:00,497 WARN org.apache.hadoop.metrics2.util.MBeans
[] - Failed to register MBean
"Hadoop:service=s3a-file-system,name=S3AMetrics24-datateam-sandbox-qa-demo":
Instance already exists.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add
this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add
this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with
module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException:
Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed.]
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with
module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException:
Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed.]
```
on AWS S3 I see Hudi folder I don't see parquet or logs files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]