rangareddy commented on issue #11803:
URL: https://github.com/apache/hudi/issues/11803#issuecomment-2352435271
Hi @Armelabdelkbir
I have tested your scenario with following code. drop column also worked
with out any issues.
Step1: Create a database and table in postgres
```
CREATE DATABASE cdc_hudi;
\connect cdc_hudi;
CREATE TABLE employees (
id INT PRIMARY KEY NOT NULL,
name VARCHAR(100) NOT NULL,
age INT,
salary DECIMAL(10, 2),
department VARCHAR(50)
);
ALTER TABLE employees REPLICA IDENTITY FULL;
```
Step2: Configure the debezium source connector for postgres db.
`vi debezium-source-postgres.json`
```json
{
"name": "postgres-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "cdc_hudi",
"topic.prefix": "cdc_topic",
"database.server.name": "postgres",
"schema.include.list": "public",
"table.include.list": "public.employees",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete": "false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081/",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081/",
"slot.name": "pgslot1"
}
}
```
Step3: Create the `postgres-debezium-connector` connector using above
configuration and verify the status
```sh
curl -s -X POST -H 'Accept: application/json' -H
"Content-Type:application/json" \
-d @debezium-source-postgres.json http://localhost:8083/connectors/ | jq
curl -s -X GET -H "Content-Type:application/json"
http://localhost:8083/connectors/postgres-debezium-connector/status | jq
```
Step4: Insert one record in postgres db and verify kafka topic is created or
not
```sql
INSERT INTO employees (id, name, age, salary, department) VALUES (1,
'Ranga', 30, 50000.00, 'Sales');
SELECT * FROM employees;
```
```
$CONFLUENT_HOME/bin/kafka-topics --list --bootstrap-server localhost:9092 |
grep employees
cdc_topic.public.employees
```
Step5: Run the Kafka Consumer to verify the data is consumed or not.
```
$CONFLUENT_HOME/bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic cdc_topic.public.employees
```
Step6: Run the Spark application
```sh
HUDI_HOME=/Install/apache/hudi
HUDI_UTILITIES_JAR=$(ls
$HUDI_HOME//packaging/hudi-utilities-bundle/target/hudi-utilities-bundle*.jar |
grep -v sources | grep -v tests)
$SPARK_HOME/bin/spark-submit \
--master "local[2]" \
--deploy-mode client \
--conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
--conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
--conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
--conf 'spark.hadoop.spark.sql.caseSensitive=false' \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
$HUDI_UTILITIES_JAR \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
\
--hoodie-conf bootstrap.servers=localhost:9092 \
--hoodie-conf schema.registry.url=http://localhost:8081 \
--hoodie-conf
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/cdc_topic.public.employees-value/versions/latest
\
--hoodie-conf
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
\
--hoodie-conf
hoodie.deltastreamer.source.kafka.topic=cdc_topic.public.employees \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true \
--table-type MERGE_ON_READ \
--op UPSERT \
--target-base-path file:///tmp/debezium/postgres/employees \
--target-table employees_cdc \
--source-class
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
--source-ordering-field _event_lsn \
--payload-class
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
--continuous \
--min-sync-interval-seconds 60
```
Step7: Perform the schema evaluation operations and verify the data.
```sql
-- Adding Columns
ALTER TABLE employees ADD COLUMN address VARCHAR;
INSERT INTO employees (id, name, age, salary, department, address) VALUES
(2, 'Nishanth', 30, 50000.00, 'Sales', 'Bangalore');
SELECT * FROM employees;
```
Step8: Launch another spark job and verify the data
```sh
export SPARK_VERSION=3.5
$SPARK_HOME/bin/spark-shell \
--master "local[2]" \
--deploy-mode client \
--packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
--conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
--conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
--conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
--conf 'spark.hadoop.spark.sql.caseSensitive=false' \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```
```sh
val basePath="file:///tmp/debezium/postgres/employees"
spark.read.format("hudi").load(basePath).show(false)
```
Step8: You can repeat further steps and verify the data.
```
-- Renaming a column
ALTER TABLE employees RENAME COLUMN department TO dept;
INSERT INTO employees (id, name, age, salary, dept, address) VALUES (3,
'Vinod', 35, 40000.00, 'HR', 'Andhrapradesh');
SELECT * FROM employees;
ALTER TABLE employees DROP column dept;
INSERT INTO employees (id, name, age, salary, address) VALUES (4, 'Manjo',
35, 40000.00, 'Chennai');
SELECT * FROM employees;
-- Type change column
ALTER TABLE employees ALTER COLUMN age TYPE bigint;
INSERT INTO employees (id, name, age, salary, address) VALUES (5, 'Raja',
59, 70000.00, 'Bangalore');
SELECT * FROM employees;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]