Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550062066

   @the-other-tim-brown Below I've laid out some information that you might 
find helpful ...
   
   ### DB Table I'm Testing With in PostgreSQL
   
   ```
   \d samser_customers
                                           Table "public.samser_customers"
      Column   |            Type             | Collation | Nullable |           
        Default
   
------------+-----------------------------+-----------+----------+----------------------------------------------
    id         | integer                     |           | not null | 
nextval('samser_customers_id_seq'::regclass)
    name       | character varying(50)       |           | not null |
    age        | integer                     |           | not null |
    created_at | timestamp without time zone |           |          |
    event_ts   | bigint                      |           |          |
   Indexes:
       "samser_customers_pkey" PRIMARY KEY, btree (id)
   Referenced by:
       TABLE "samser_orders" CONSTRAINT "fk_customer" FOREIGN KEY (customer_id) 
REFERENCES samser_customers(id)
   Publications:
       "dbz_publication"
   ```
   
   The dummy data currently in the table:
   
   ```
   SELECT * FROM samser_customers;
    id |   name   | age |         created_at         |   event_ts
   ----+----------+-----+----------------------------+---------------
     1 | Bob      |  40 | 2023-05-12 00:05:27.204463 | 1681984800000
     2 | Alice    |  30 | 2023-05-12 00:05:27.204463 | 1681988400000
     3 | John     |  37 | 2023-05-12 00:05:27.204463 | 1681992000000
     4 | Jon      |  25 | 2023-05-12 00:05:27.204463 | 1681995600000
     5 | David    |  20 | 2023-05-12 00:05:27.204463 | 1681999200000
     6 | Jack     |  70 | 2023-05-12 00:05:27.204463 | 1682002800000
     8 | Ali      |  35 | 2023-05-15 18:03:11.675954 | 1683712800000
     9 | Jonathan |  30 | 2023-05-15 18:03:11.675954 | 1683716400000
    10 | Daniel   |  37 | 2023-05-15 18:03:11.675954 | 1683720000000
    11 | Taylor   |  25 | 2023-05-15 18:03:11.675954 | 1683723600000
    12 | Lex      |  32 | 2023-05-15 18:03:11.675954 | 1683727200000
    13 | Shane    |  45 | 2023-05-15 18:03:11.675954 | 1683730800000
   (12 rows)
   ```
   
   ### Debezium Setup
   
   I'm leveraging [**Strimzi K8S Kafka Operator**](https://strimzi.io/) which 
includes `KafkaConnect` in it suite. Then I create a Docker Image as instructed 
in [this 
doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/)
 to create/set up the Debezium Connector within that KafkaConnect.
   
   I also include **Confluent's relevant JARs** in that image under the `libs` 
directory (more on Debezium <> Confluent 
[here](https://debezium.io/documentation/reference/2.1/configuration/avro.html#confluent-schema-registry)).
 Essentially this is my `Dockerfile`:
   
   ```Dockerfile
   FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
   USER root:root
   RUN mkdir -p /opt/kafka/plugins/debezium
   RUN mkdir -p /opt/kafka/libs
   COPY ./debezium-connector-postgres-2.2.0.Final/ /opt/kafka/plugins/debezium/
   COPY ./confluent-jars-7.3.3/*.jar /opt/kafka/libs/
   USER 1001
   ```
   
   ### Debezium Kafka Connector Configuration
   
   You can see some information in [that same 
doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/)
 under a section titled "Create The Connector". And my Debezium configuration 
is laid out below. And the **inferred/registered schema** and **sample messages 
consumed** are laid out in [my older 
comment](https://github.com/apache/hudi/issues/8519#issuecomment-1542967885) as 
you know.
   
   ```
   plugin.name: pgoutput
   database.hostname: <DB_HOST>
   database.port: 5432
   database.user: <DB_USERNAME>
   database.password: <DB_PWD>
   database.dbname : <DB_NAME>
   topic.prefix: <KAFKA_TOPIC_PREFIX>
   schema.include.list: public
   key.converter: io.confluent.connect.avro.AvroConverter
   key.converter.schema.registry.url: http://<REGISTRY_URL>:8081
   value.converter: io.confluent.connect.avro.AvroConverter
   value.converter.schema.registry.url: http://<REGISTRY_URL>:8081
   table.include.list: public.samser_customers
   topic.creation.enable: true
   topic.creation.default.replication.factor: 1
   topic.creation.default.partitions: 1
   topic.creation.default.cleanup.policy: compact
   topic.creation.default.compression.type: lz4
   decimal.handling.mode: double
   tombstones.on.delete: false
   ```
   
   Thanks again @the-other-tim-brown for the help, I really appreciate it and 
please let me know in case there is anything else you need.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to