rangareddy commented on issue #11803:
URL: https://github.com/apache/hudi/issues/11803#issuecomment-2352435271

   Hi @Armelabdelkbir 
   
   I have tested your scenario with following code. drop column also worked 
with out any issues. 
   
   Step1: Create a database and table in postgres
   
   ```
   CREATE DATABASE cdc_hudi;
   
   \connect cdc_hudi;
   
   CREATE TABLE employees (
       id INT PRIMARY KEY NOT NULL,
       name VARCHAR(100) NOT NULL,
       age INT,
       salary DECIMAL(10, 2),
       department VARCHAR(50)
   );
   
   ALTER TABLE employees REPLICA IDENTITY FULL;
   ```
   
   Step2: Configure the debezium source connector for postgres db.
   
   `vi debezium-source-postgres.json`
   
   ```json
   {
     "name": "postgres-debezium-connector",
     "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "tasks.max": "1",
       "plugin.name": "pgoutput",
       "database.hostname": "localhost",
       "database.port": "5432",
       "database.user": "postgres",
       "database.password": "postgres",
       "database.dbname": "cdc_hudi",
       "topic.prefix": "cdc_topic",
       "database.server.name": "postgres",
       "schema.include.list": "public",
       "table.include.list": "public.employees",
       "publication.autocreate.mode": "filtered",
       "tombstones.on.delete": "false",
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "http://localhost:8081/";,
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url": "http://localhost:8081/";,
       "slot.name": "pgslot1"
     }
   }
   ```
   
   Step3: Create the `postgres-debezium-connector` connector using above 
configuration and verify the status
   
   ```sh
   curl -s -X POST -H 'Accept: application/json' -H 
"Content-Type:application/json" \
    -d @debezium-source-postgres.json http://localhost:8083/connectors/ | jq
   
   curl -s -X GET -H "Content-Type:application/json" 
http://localhost:8083/connectors/postgres-debezium-connector/status | jq
   ```
   
   Step4: Insert one record in postgres db and verify kafka topic is created or 
not
   
   ```sql
   INSERT INTO employees (id, name, age, salary, department) VALUES (1, 
'Ranga', 30, 50000.00, 'Sales');
   SELECT * FROM employees;
   ```
   
   ```
   $CONFLUENT_HOME/bin/kafka-topics --list --bootstrap-server localhost:9092 | 
grep employees
   cdc_topic.public.employees
   ```
   
   Step5: Run the Kafka Consumer to verify the data is consumed or not.
   
   ```
   $CONFLUENT_HOME/bin/kafka-avro-console-consumer \
     --bootstrap-server localhost:9092 \
     --from-beginning \
     --topic cdc_topic.public.employees
   ```
   
   Step6: Run the Spark application
   
   ```sh
   HUDI_HOME=/Install/apache/hudi
   HUDI_UTILITIES_JAR=$(ls 
$HUDI_HOME//packaging/hudi-utilities-bundle/target/hudi-utilities-bundle*.jar | 
grep -v sources | grep -v tests)
   
   $SPARK_HOME/bin/spark-submit \
     --master "local[2]" \
     --deploy-mode client \
     --conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
     --conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
     --conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
     --conf 'spark.hadoop.spark.sql.caseSensitive=false' \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
$HUDI_UTILITIES_JAR \
     --hoodie-conf hoodie.datasource.write.recordkey.field=id \
     --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 \
     --hoodie-conf bootstrap.servers=localhost:9092 \
     --hoodie-conf schema.registry.url=http://localhost:8081 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/cdc_topic.public.employees-value/versions/latest
 \
     --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
 \
     --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=cdc_topic.public.employees \
     --hoodie-conf auto.offset.reset=earliest \
     --hoodie-conf 
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true \
     --table-type MERGE_ON_READ \
     --op UPSERT \
     --target-base-path file:///tmp/debezium/postgres/employees \
     --target-table employees_cdc  \
     --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
     --source-ordering-field _event_lsn \
     --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
     --continuous \
     --min-sync-interval-seconds 60
   ```
   
   Step7: Perform the schema evaluation operations and verify the data.
   
   ```sql
   -- Adding Columns
   ALTER TABLE employees ADD COLUMN address VARCHAR;
   
   INSERT INTO employees (id, name, age, salary, department, address) VALUES 
(2, 'Nishanth', 30, 50000.00, 'Sales', 'Bangalore');
   
   SELECT * FROM employees;
   ```
   
   Step8: Launch another spark job and verify the data
   
   ```sh
   export SPARK_VERSION=3.5
   $SPARK_HOME/bin/spark-shell \
     --master "local[2]" \
     --deploy-mode client \
     --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
     --conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
     --conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
     --conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
     --conf 'spark.hadoop.spark.sql.caseSensitive=false' \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
     --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
     --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
   ```
   
   ```sh
   val basePath="file:///tmp/debezium/postgres/employees"
   spark.read.format("hudi").load(basePath).show(false)
   ```
   
   Step8: You can repeat further steps and verify the data.
   
   ```
   -- Renaming a column
   ALTER TABLE employees RENAME COLUMN department TO dept;
   
   INSERT INTO employees (id, name, age, salary, dept, address) VALUES (3, 
'Vinod', 35, 40000.00, 'HR', 'Andhrapradesh');
   
   SELECT * FROM employees;
   
   ALTER TABLE employees DROP column dept;
   
   INSERT INTO employees (id, name, age, salary, address) VALUES (4, 'Manjo', 
35, 40000.00, 'Chennai');
   
   SELECT * FROM employees;
   
   -- Type change column
   ALTER TABLE employees ALTER COLUMN age TYPE bigint;
   
   INSERT INTO employees (id, name, age, salary, address) VALUES (5, 'Raja', 
59, 70000.00, 'Bangalore');
   
   SELECT * FROM employees;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to