This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 82e9b3b279d [fix][docs] Update Debezium Postgres connector docs to 
v2.x properties and resolve schema mismatch (#1137)
82e9b3b279d is described below

commit 82e9b3b279dfa9a3722460aba0e7138e02e6b202
Author: Praveen Kumar <[email protected]>
AuthorDate: Mon May 4 11:47:37 2026 +0530

    [fix][docs] Update Debezium Postgres connector docs to v2.x properties and 
resolve schema mismatch (#1137)
    
    * [fix][docs] Update Debezium Postgres docs to v2.x properties and resolve 
silent data drop
    
    * Apply changes to versioned_docs for 4.2.x
    
    ---------
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 docs/io-cdc-debezium.md                            |  76 +++++++--------
 docs/io-debezium-source.md                         | 104 ++++++++++-----------
 versioned_docs/version-4.2.x/io-cdc-debezium.md    |  76 +++++++--------
 versioned_docs/version-4.2.x/io-debezium-source.md | 104 ++++++++++-----------
 4 files changed, 178 insertions(+), 182 deletions(-)

diff --git a/docs/io-cdc-debezium.md b/docs/io-cdc-debezium.md
index 04e356664f4..f26fa2598d6 100644
--- a/docs/io-cdc-debezium.md
+++ b/docs/io-cdc-debezium.md
@@ -16,28 +16,28 @@ The Debezium source connector pulls messages from MySQL or 
PostgreSQL and persis
 
 The configuration of the Debezium source connector has the following 
properties.
 
-| Name | Required | Default | Description |
-|------|----------|---------|-------------|
-| `task.class` | true | null | A source task class that is implemented in 
Debezium. |
-| `database.hostname` | true | null | The address of a database server. |
-| `database.port` | true | null | The port number of a database server.|
-| `database.user` | true | null | The name of a database user that has the 
required privileges. |
-| `database.password` | true | null | The password for a database user that 
has the required privileges. |
-| `database.server.id` | true | null | The connector's identifier that must be 
unique within a database cluster and similar to the database's server-id 
configuration property. |
-| `database.server.name` | true | null | The logical name of a database 
server/cluster, which forms a namespace and is used in all the names of Kafka 
topics to which the connector writes, the Kafka Connect schema names, and the 
namespaces of the corresponding Avro schema when the Avro Connector is used. |
-| `database.whitelist` | false | null | A list of all databases hosted by this 
server that is monitored by the connector.<br /><br /> This is optional, and 
there are other properties for listing databases and tables to include or 
exclude from monitoring. |
-| `key.converter` | true | null | The converter provided by Kafka Connect to 
convert the record key. |
-| `value.converter` | true | null | The converter provided by Kafka Connect to 
convert the record value.  |
-| `database.history` | true | null | The name of the database history class. |
-| `database.history.pulsar.topic` | true | null | The name of the database 
history topic where the connector writes and recovers DDL statements. <br /><br 
/>**Note: this topic is for internal use only and should not be used by 
consumers.** |
+| Name                                  | Required | Default | Description |
+|---------------------------------------|----------|---------|-------------|
+| `task.class`                          | true | null | A source task class 
that is implemented in Debezium. |
+| `database.hostname`                   | true | null | The address of a 
database server. |
+| `database.port`                       | true | null | The port number of a 
database server.|
+| `database.user`                       | true | null | The name of a database 
user that has the required privileges. |
+| `database.password`                   | true | null | The password for a 
database user that has the required privileges. |
+| `database.server.id`                  | true | null | The connector's 
identifier that must be unique within a database cluster and similar to the 
database's server-id configuration property. |
+| `topic.prefix`                        | true | null | The logical name of a 
database server/cluster, which forms a namespace and is used in all the names 
of Kafka topics to which the connector writes, the Kafka Connect schema names, 
and the namespaces of the corresponding Avro schema when the Avro Connector is 
used. |
+| `database.include.list`               | false | null | A list of all 
databases hosted by this server that is monitored by the connector.<br /><br /> 
This is optional, and there are other properties for listing databases and 
tables to include or exclude from monitoring. |
+| `key.converter`                       | true | null | The converter provided 
by Kafka Connect to convert the record key. |
+| `value.converter`                     | true | null | The converter provided 
by Kafka Connect to convert the record value.  |
+| `database.history`                    | true | null | The name of the 
database history class. |
+| `database.history.pulsar.topic`       | true | null | The name of the 
database history topic where the connector writes and recovers DDL statements. 
<br /><br />**Note: this topic is for internal use only and should not be used 
by consumers.** |
 | `database.history.pulsar.service.url` | false| null | Pulsar cluster service 
URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
-| `pulsar.service.url` | true | null | Pulsar cluster service URL for the 
offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url 
http://pulsar:8080 sources localrun --source-config-file 
$PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar 
cluster.|
-| `offset.storage.topic` | true | null | Record the last committed offsets 
that the connector successfully completes. |
-| `mongodb.hosts` | true | null | The comma-separated list of hostname and 
port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the 
replica set. The list contains a single hostname and a port pair. If 
mongodb.members.auto.discover is set to false, the host and port pair are 
prefixed with the replica set name (e.g., rs0/localhost:27017). |
-| `mongodb.name` | true | null | A unique name that identifies the connector 
and/or MongoDB replica set or shared cluster that this connector monitors. Each 
server should be monitored by at most one Debezium connector, since this server 
name prefixes all persisted Kafka topics emanating from the MongoDB replica set 
or cluster. |
-| `mongodb.user` | true | null | Name of the database user to be used when 
connecting to MongoDB. This is required only when MongoDB is configured to use 
authentication. |
-| `mongodb.password` | true | null | Password to be used when connecting to 
MongoDB. This is required only when MongoDB is configured to use 
authentication. |
-| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that 
attempts to use a separate task for each replica set. |
+| `pulsar.service.url`                  | true | null | Pulsar cluster service 
URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin 
--admin-url http://pulsar:8080 sources localrun --source-config-file 
$PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar 
cluster.|
+| `offset.storage.topic`                | true | null | Record the last 
committed offsets that the connector successfully completes. |
+| `mongodb.hosts`                       | true | null | The comma-separated 
list of hostname and port pairs (in the form 'host' or 'host:port') of the 
MongoDB servers in the replica set. The list contains a single hostname and a 
port pair. If mongodb.members.auto.discover is set to false, the host and port 
pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
+| `mongodb.name`                        | true | null | A unique name that 
identifies the connector and/or MongoDB replica set or shared cluster that this 
connector monitors. Each server should be monitored by at most one Debezium 
connector, since this server name prefixes all persisted Kafka topics emanating 
from the MongoDB replica set or cluster. |
+| `mongodb.user`                        | true | null | Name of the database 
user to be used when connecting to MongoDB. This is required only when MongoDB 
is configured to use authentication. |
+| `mongodb.password`                    | true | null | Password to be used 
when connecting to MongoDB. This is required only when MongoDB is configured to 
use authentication. |
+| `mongodb.task.id`                     | true | null | The taskId of the 
MongoDB connector that attempts to use a separate task for each replica set. |
 
 
 
@@ -59,13 +59,13 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "debezium",
         "database.password": "dbz",
         "database.server.id": "184054",
-        "database.server.name": "dbserver1",
-        "database.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "database.include.list": "inventory",
         "database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
         "database.history.pulsar.topic": "history-topic",
         "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
-        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
         "pulsar.service.url": "pulsar://127.0.0.1:6650",
         "offset.storage.topic": "offset-topic"
      }
@@ -92,15 +92,15 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "debezium"
       database.password: "dbz"
       database.server.id: "184054"
-      database.server.name: "dbserver1"
-      database.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      database.include.list: "inventory"
       database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
       database.history.pulsar.topic: "history-topic"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
 
       ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
-      key.converter: "org.apache.kafka.connect.json.JsonConverter"
-      value.converter: "org.apache.kafka.connect.json.JsonConverter"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -142,7 +142,7 @@ This example shows how to change the data of a MySQL table 
using the Pulsar Debe
        --name debezium-mysql-source \
        --tenant public \
        --namespace default \
-       --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","database.server.name": 
"dbserver1","database.whitelist": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.json.JsonConverter","va [...]
+       --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","topic.prefix": 
"dbserver1","database.include.list": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.storage.StringConverter","va [...]
        ```
 
    * Use the **YAML** configuration file as shown previously.
@@ -229,8 +229,8 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "postgres",
         "database.password": "postgres",
         "database.dbname": "postgres",
-        "database.server.name": "dbserver1",
-        "schema.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "schema.include.list": "inventory",
         "pulsar.service.url": "pulsar://127.0.0.1:6650"
      }
   }
@@ -256,8 +256,8 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "postgres"
       database.password: "postgres"
       database.dbname: "postgres"
-      database.server.name: "dbserver1"
-      schema.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      schema.include.list: "inventory"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -293,7 +293,7 @@ This example shows how to change the data of a PostgreSQL 
table using the Pulsar
        --name debezium-postgres-source \
        --tenant public \
        --namespace default \
-       --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"postgres","database.dbname": "postgres","database.server.name": 
"dbserver1","plugin.name": "pgoutput","schema.whitelist": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
+       --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"postgres","database.dbname": "postgres","topic.prefix": 
"dbserver1","plugin.name": "pgoutput","schema.include.list": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
        ```
 
    * Use the **YAML** configuration file as shown previously.
@@ -364,7 +364,7 @@ You need to create a configuration file before using the 
Pulsar Debezium connect
         "mongodb.user": "debezium",
         "mongodb.password": "dbz",
         "mongodb.task.id": "1",
-        "database.whitelist": "inventory",
+        "database.include.list": "inventory",
         "pulsar.service.url": "pulsar://127.0.0.1:6650"
      }
   }
@@ -390,7 +390,7 @@ You need to create a configuration file before using the 
Pulsar Debezium connect
       mongodb.user: "debezium"
       mongodb.password: "dbz"
       mongodb.task.id: "1"
-      database.whitelist: "inventory"
+      database.include.list: "inventory"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -435,7 +435,7 @@ This example shows how to change the data of a MongoDB 
table using the Pulsar De
        --name debezium-mongodb-source \
        --tenant public \
        --namespace default \
-       --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": 
"dbserver1","mongodb.user": "debezium","mongodb.password": 
"dbz","mongodb.task.id": "1","database.whitelist": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
+       --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": 
"dbserver1","mongodb.user": "debezium","mongodb.password": 
"dbz","mongodb.task.id": "1","database.include.list": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
        ```
 
    * Use the **YAML** configuration file as shown previously.
diff --git a/docs/io-debezium-source.md b/docs/io-debezium-source.md
index f5fdce1a4be..9c60c5586e9 100644
--- a/docs/io-debezium-source.md
+++ b/docs/io-debezium-source.md
@@ -16,25 +16,25 @@ The Debezium source connector pulls messages from MySQL or 
PostgreSQL and persis
 
 The configuration of the Debezium source connector has the following 
properties.
 
-| Name | Required | Default | Description |
-|------|----------|---------|-------------|
-| `task.class` | true | null | A source task class that implemented in 
Debezium. |
-| `database.hostname` | true | null | The address of a database server. |
-| `database.port` | true | null | The port number of a database server.|
-| `database.user` | true | null | The name of a database user that has the 
required privileges. |
-| `database.password` | true | null | The password for a database user that 
has the required privileges. |
-| `database.server.id` | true | null | The connector's identifier that must be 
unique within a database cluster and similar to the database's server-id 
configuration property. |
-| `database.server.name` | true | null | The logical name of a database 
server/cluster, which forms a namespace and it is used in all the names of 
Kafka topics to which the connector writes, the Kafka Connect schema names, and 
the namespaces of the corresponding Avro schema when the Avro Connector is 
used. |
-| `database.whitelist` | false | null | A list of all databases hosted by this 
server which is monitored by the  connector.<br /><br /> This is optional, and 
there are other properties for listing databases and tables to include or 
exclude from monitoring. |
-| `key.converter` | true | null | The converter provided by Kafka Connect to 
convert record key. |
-| `value.converter` | true | null | The converter provided by Kafka Connect to 
convert record value.  |
-| `database.history` | true | null | The name of the database history class. |
-| `database.history.pulsar.topic` | true | null | The name of the database 
history topic where the connector writes and recovers DDL statements. <br /><br 
/>**Note: this topic is for internal use only and should not be used by 
consumers.** |
-| `database.history.pulsar.service.url` | false| null | Pulsar cluster service 
URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
-| `offset.storage.topic` | true | null | Record the last committed offsets 
that the connector successfully completes. |
-| `json-with-envelope` | false | false | Present the message that only 
consists of payload. |
+| Name                                    | Required | Default | Description |
+|-----------------------------------------|----------|---------|-------------|
+| `task.class`                            | true | null | A source task class 
that implemented in Debezium. |
+| `database.hostname`                     | true | null | The address of a 
database server. |
+| `database.port`                         | true | null | The port number of a 
database server.|
+| `database.user`                         | true | null | The name of a 
database user that has the required privileges. |
+| `database.password`                     | true | null | The password for a 
database user that has the required privileges. |
+| `database.server.id`                    | true | null | The connector's 
identifier that must be unique within a database cluster and similar to the 
database's server-id configuration property. |
+| `topic.prefix`                  | true | null | The logical name of a 
database server/cluster, which forms a namespace and it is used in all the 
names of Kafka topics to which the connector writes, the Kafka Connect schema 
names, and the namespaces of the corresponding Avro schema when the Avro 
Connector is used. |
+| `database.include.list`            | false | null | A list of all databases 
hosted by this server which is monitored by the  connector.<br /><br /> This is 
optional, and there are other properties for listing databases and tables to 
include or exclude from monitoring. |
+| `key.converter`                         | true | null | The converter 
provided by Kafka Connect to convert record key. |
+| `value.converter`                       | true | null | The converter 
provided by Kafka Connect to convert record value.  |
+| `database.history`                      | true | null | The name of the 
database history class. |
+| `database.history.pulsar.topic`         | true | null | The name of the 
database history topic where the connector writes and recovers DDL statements. 
<br /><br />**Note: this topic is for internal use only and should not be used 
by consumers.** |
+| `database.history.pulsar.service.url`   | false| null | Pulsar cluster 
service URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
+| `offset.storage.topic`                  | true | null | Record the last 
committed offsets that the connector successfully completes. |
+| `json-with-envelope`                    | false | false | Present the 
message that only consists of payload. |
 | `database.history.pulsar.reader.config` | false | null | The configs of the 
reader for the database schema history topic, in the form of a JSON string with 
key-value pairs. |
-| `offset.storage.reader.config` | false | null | The configs of the reader 
for the kafka connector offsets topic, in the form of a JSON string with 
key-value pairs. |
+| `offset.storage.reader.config`          | false | null | The configs of the 
reader for the kafka connector offsets topic, in the form of a JSON string with 
key-value pairs. |
 
 ### Converter Options
 
@@ -100,13 +100,13 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "debezium",
         "database.password": "dbz",
         "database.server.id": "184054",
-        "database.server.name": "dbserver1",
-        "database.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "table.include.list": "inventory",
         "database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
         "database.history.pulsar.topic": "history-topic",
         "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
-        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
         "offset.storage.topic": "offset-topic"
      }
   }
@@ -132,15 +132,15 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "debezium"
       database.password: "dbz"
       database.server.id: "184054"
-      database.server.name: "dbserver1"
-      database.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      database.include.list: "inventory"
       database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
       database.history.pulsar.topic: "history-topic"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
 
       ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
-      key.converter: "org.apache.kafka.connect.json.JsonConverter"
-      value.converter: "org.apache.kafka.connect.json.JsonConverter"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## OFFSET_STORAGE_TOPIC_CONFIG
       offset.storage.topic: "offset-topic"
@@ -179,7 +179,7 @@ This example shows how to change the data of a MySQL table 
using the Pulsar Debe
            --name debezium-mysql-source \
            --tenant public \
            --namespace default \
-           --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","database.server.name": 
"dbserver1","database.whitelist": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.json.JsonConverter" [...]
+           --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","topic.prefix": 
"dbserver1","database.include.list": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.storage.StringConverter" [...]
        ```
 
      :::note
@@ -278,10 +278,10 @@ You can use one of the following methods to create a 
configuration file.
       "database.user": "postgres",
       "database.password": "changeme",
       "database.dbname": "postgres",
-      "database.server.name": "dbserver1",
+      "topic.prefix": "dbserver1",
       "plugin.name": "pgoutput",
-      "schema.whitelist": "public",
-      "table.whitelist": "public.users",
+      "schema.include.list": "public",
+      "table.include.list": "public.users",
       "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
   }
   ```
@@ -305,10 +305,12 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "postgres"
       database.password: "changeme"
       database.dbname: "postgres"
-      database.server.name: "dbserver1"
+      topic.prefix: "dbserver1"
       plugin.name: "pgoutput"
-      schema.whitelist: "public"
-      table.whitelist: "public.users"
+      schema.include.list: "public"
+      table.include.list: "public.users"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## PULSAR_SERVICE_URL_CONFIG
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -349,7 +351,7 @@ This example shows how to change the data of a PostgreSQL 
table using the Pulsar
            --name debezium-postgres-source \
            --tenant public \
            --namespace default \
-           --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"changeme","database.dbname": "postgres","database.server.name": 
"dbserver1","plugin.name": "pgoutput","schema.whitelist": 
"public","table.whitelist": "public.users","pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
+           --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"changeme","database.dbname": "postgres","topic.prefix": 
"dbserver1","plugin.name": "pgoutput","schema.include.list": 
"public","table.include.list": "public.users","key.converter": 
"org.apache.kafka.connect.storage.StringConverter","value.converter": 
"org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
        ```
 
      :::note
@@ -441,7 +443,7 @@ You can use one of the following methods to create a 
configuration file.
       "mongodb.user": "debezium",
       "mongodb.password": "dbz",
       "mongodb.task.id": "1",
-      "database.whitelist": "inventory",
+      "database.include.list": "inventory",
       "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
   }
   ```
@@ -466,7 +468,7 @@ You can use one of the following methods to create a 
configuration file.
       mongodb.user: "debezium"
       mongodb.password: "dbz"
       mongodb.task.id: "1"
-      database.whitelist: "inventory"
+      database.include.list: "inventory"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
   ```
 
@@ -509,7 +511,7 @@ This example shows how to change the data of a MongoDB 
table using the Pulsar De
            --name debezium-mongodb-source \
            --tenant public \
            --namespace default \
-           --source-config '{"mongodb.hosts": 
"rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": 
"debezium","mongodb.password": "dbz","mongodb.task.id": 
"1","database.whitelist": "inventory","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
+           --source-config '{"mongodb.hosts": 
"rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": 
"debezium","mongodb.password": "dbz","mongodb.task.id": 
"1","database.include.list": "inventory","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
        ```
 
      :::note
@@ -585,14 +587,13 @@ Using YAML as an example, you can create a 
`debezium-oracle-source-config.yaml`
   "database.user": "dbzuser",
   "database.password": "dbz",
   "database.dbname": "XE",
-  "database.server.name": "XE",
+  "topic.prefix": "XE",
   "schema.exclude.list": "system,dbzuser",
   "snapshot.mode": "initial",
   "topic.namespace": "public/default",
   "task.class": "io.debezium.connector.oracle.OracleConnectorTask",
-  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
+  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
   "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
   "database.tcpKeepAlive": "true",
   "decimal.handling.mode": "double",
@@ -619,14 +620,13 @@ configs:
     database.user: "dbzuser"
     database.password: "dbz"
     database.dbname: "XE"
-    database.server.name: "XE"
+    topic.prefix: "XE"
     schema.exclude.list: "system,dbzuser"
     snapshot.mode: "initial"
     topic.namespace: "public/default"
     task.class: "io.debezium.connector.oracle.OracleConnectorTask"
-    value.converter: "org.apache.kafka.connect.json.JsonConverter"
-    key.converter: "org.apache.kafka.connect.json.JsonConverter"
-    typeClassName: "org.apache.pulsar.common.schema.KeyValue"
+    key.converter: "org.apache.kafka.connect.storage.StringConverter",
+    value.converter: "org.apache.kafka.connect.storage.StringConverter",
     database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
     database.tcpKeepAlive: "true"
     decimal.handling.mode: "double"
@@ -655,13 +655,12 @@ Similarly to other connectors, you can use JSON or YAML 
to configure the connect
   "database.user": "sa",
   "database.password": "MyP@ssw0rd!",
   "database.dbname": "MyTestDB",
-  "database.server.name": "mssql",
+  "topic.prefix": "mssql",
   "snapshot.mode": "schema_only",
   "topic.namespace": "public/default",
   "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
-  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
+  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
   "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
   "database.tcpKeepAlive": "true",
   "decimal.handling.mode": "double",
@@ -688,13 +687,12 @@ configs:
     database.user: "sa"
     database.password: "MyP@ssw0rd!"
     database.dbname: "MyTestDB"
-    database.server.name: "mssql"
+    topic.prefix: "mssql"
     snapshot.mode: "schema_only"
     topic.namespace: "public/default"
     task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask"
-    value.converter: "org.apache.kafka.connect.json.JsonConverter"
-    key.converter: "org.apache.kafka.connect.json.JsonConverter"
-    typeClassName: "org.apache.pulsar.common.schema.KeyValue"
+    key.converter: "org.apache.kafka.connect.storage.StringConverter"
+    value.converter: "org.apache.kafka.connect.storage.StringConverter"
     database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
     database.tcpKeepAlive: "true"
     decimal.handling.mode: "double"
diff --git a/versioned_docs/version-4.2.x/io-cdc-debezium.md 
b/versioned_docs/version-4.2.x/io-cdc-debezium.md
index 04e356664f4..f26fa2598d6 100644
--- a/versioned_docs/version-4.2.x/io-cdc-debezium.md
+++ b/versioned_docs/version-4.2.x/io-cdc-debezium.md
@@ -16,28 +16,28 @@ The Debezium source connector pulls messages from MySQL or 
PostgreSQL and persis
 
 The configuration of the Debezium source connector has the following 
properties.
 
-| Name | Required | Default | Description |
-|------|----------|---------|-------------|
-| `task.class` | true | null | A source task class that is implemented in 
Debezium. |
-| `database.hostname` | true | null | The address of a database server. |
-| `database.port` | true | null | The port number of a database server.|
-| `database.user` | true | null | The name of a database user that has the 
required privileges. |
-| `database.password` | true | null | The password for a database user that 
has the required privileges. |
-| `database.server.id` | true | null | The connector's identifier that must be 
unique within a database cluster and similar to the database's server-id 
configuration property. |
-| `database.server.name` | true | null | The logical name of a database 
server/cluster, which forms a namespace and is used in all the names of Kafka 
topics to which the connector writes, the Kafka Connect schema names, and the 
namespaces of the corresponding Avro schema when the Avro Connector is used. |
-| `database.whitelist` | false | null | A list of all databases hosted by this 
server that is monitored by the connector.<br /><br /> This is optional, and 
there are other properties for listing databases and tables to include or 
exclude from monitoring. |
-| `key.converter` | true | null | The converter provided by Kafka Connect to 
convert the record key. |
-| `value.converter` | true | null | The converter provided by Kafka Connect to 
convert the record value.  |
-| `database.history` | true | null | The name of the database history class. |
-| `database.history.pulsar.topic` | true | null | The name of the database 
history topic where the connector writes and recovers DDL statements. <br /><br 
/>**Note: this topic is for internal use only and should not be used by 
consumers.** |
+| Name                                  | Required | Default | Description |
+|---------------------------------------|----------|---------|-------------|
+| `task.class`                          | true | null | A source task class 
that is implemented in Debezium. |
+| `database.hostname`                   | true | null | The address of a 
database server. |
+| `database.port`                       | true | null | The port number of a 
database server.|
+| `database.user`                       | true | null | The name of a database 
user that has the required privileges. |
+| `database.password`                   | true | null | The password for a 
database user that has the required privileges. |
+| `database.server.id`                  | true | null | The connector's 
identifier that must be unique within a database cluster and similar to the 
database's server-id configuration property. |
+| `topic.prefix`                        | true | null | The logical name of a 
database server/cluster, which forms a namespace and is used in all the names 
of Kafka topics to which the connector writes, the Kafka Connect schema names, 
and the namespaces of the corresponding Avro schema when the Avro Connector is 
used. |
+| `database.include.list`               | false | null | A list of all 
databases hosted by this server that is monitored by the connector.<br /><br /> 
This is optional, and there are other properties for listing databases and 
tables to include or exclude from monitoring. |
+| `key.converter`                       | true | null | The converter provided 
by Kafka Connect to convert the record key. |
+| `value.converter`                     | true | null | The converter provided 
by Kafka Connect to convert the record value.  |
+| `database.history`                    | true | null | The name of the 
database history class. |
+| `database.history.pulsar.topic`       | true | null | The name of the 
database history topic where the connector writes and recovers DDL statements. 
<br /><br />**Note: this topic is for internal use only and should not be used 
by consumers.** |
 | `database.history.pulsar.service.url` | false| null | Pulsar cluster service 
URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
-| `pulsar.service.url` | true | null | Pulsar cluster service URL for the 
offset topic used in Debezium. You can use the `bin/pulsar-admin --admin-url 
http://pulsar:8080 sources localrun --source-config-file 
$PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar 
cluster.|
-| `offset.storage.topic` | true | null | Record the last committed offsets 
that the connector successfully completes. |
-| `mongodb.hosts` | true | null | The comma-separated list of hostname and 
port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the 
replica set. The list contains a single hostname and a port pair. If 
mongodb.members.auto.discover is set to false, the host and port pair are 
prefixed with the replica set name (e.g., rs0/localhost:27017). |
-| `mongodb.name` | true | null | A unique name that identifies the connector 
and/or MongoDB replica set or shared cluster that this connector monitors. Each 
server should be monitored by at most one Debezium connector, since this server 
name prefixes all persisted Kafka topics emanating from the MongoDB replica set 
or cluster. |
-| `mongodb.user` | true | null | Name of the database user to be used when 
connecting to MongoDB. This is required only when MongoDB is configured to use 
authentication. |
-| `mongodb.password` | true | null | Password to be used when connecting to 
MongoDB. This is required only when MongoDB is configured to use 
authentication. |
-| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that 
attempts to use a separate task for each replica set. |
+| `pulsar.service.url`                  | true | null | Pulsar cluster service 
URL for the offset topic used in Debezium. You can use the `bin/pulsar-admin 
--admin-url http://pulsar:8080 sources localrun --source-config-file 
$PWD/configs/pg-pulsar-config.yaml` command to point to the target Pulsar 
cluster.|
+| `offset.storage.topic`                | true | null | Record the last 
committed offsets that the connector successfully completes. |
+| `mongodb.hosts`                       | true | null | The comma-separated 
list of hostname and port pairs (in the form 'host' or 'host:port') of the 
MongoDB servers in the replica set. The list contains a single hostname and a 
port pair. If mongodb.members.auto.discover is set to false, the host and port 
pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
+| `mongodb.name`                        | true | null | A unique name that 
identifies the connector and/or MongoDB replica set or shared cluster that this 
connector monitors. Each server should be monitored by at most one Debezium 
connector, since this server name prefixes all persisted Kafka topics emanating 
from the MongoDB replica set or cluster. |
+| `mongodb.user`                        | true | null | Name of the database 
user to be used when connecting to MongoDB. This is required only when MongoDB 
is configured to use authentication. |
+| `mongodb.password`                    | true | null | Password to be used 
when connecting to MongoDB. This is required only when MongoDB is configured to 
use authentication. |
+| `mongodb.task.id`                     | true | null | The taskId of the 
MongoDB connector that attempts to use a separate task for each replica set. |
 
 
 
@@ -59,13 +59,13 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "debezium",
         "database.password": "dbz",
         "database.server.id": "184054",
-        "database.server.name": "dbserver1",
-        "database.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "database.include.list": "inventory",
         "database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
         "database.history.pulsar.topic": "history-topic",
         "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
-        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
         "pulsar.service.url": "pulsar://127.0.0.1:6650",
         "offset.storage.topic": "offset-topic"
      }
@@ -92,15 +92,15 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "debezium"
       database.password: "dbz"
       database.server.id: "184054"
-      database.server.name: "dbserver1"
-      database.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      database.include.list: "inventory"
       database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
       database.history.pulsar.topic: "history-topic"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
 
       ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
-      key.converter: "org.apache.kafka.connect.json.JsonConverter"
-      value.converter: "org.apache.kafka.connect.json.JsonConverter"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -142,7 +142,7 @@ This example shows how to change the data of a MySQL table 
using the Pulsar Debe
        --name debezium-mysql-source \
        --tenant public \
        --namespace default \
-       --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","database.server.name": 
"dbserver1","database.whitelist": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.json.JsonConverter","va [...]
+       --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","topic.prefix": 
"dbserver1","database.include.list": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.storage.StringConverter","va [...]
        ```
 
    * Use the **YAML** configuration file as shown previously.
@@ -229,8 +229,8 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "postgres",
         "database.password": "postgres",
         "database.dbname": "postgres",
-        "database.server.name": "dbserver1",
-        "schema.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "schema.include.list": "inventory",
         "pulsar.service.url": "pulsar://127.0.0.1:6650"
      }
   }
@@ -256,8 +256,8 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "postgres"
       database.password: "postgres"
       database.dbname: "postgres"
-      database.server.name: "dbserver1"
-      schema.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      schema.include.list: "inventory"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -293,7 +293,7 @@ This example shows how to change the data of a PostgreSQL 
table using the Pulsar
        --name debezium-postgres-source \
        --tenant public \
        --namespace default \
-       --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"postgres","database.dbname": "postgres","database.server.name": 
"dbserver1","plugin.name": "pgoutput","schema.whitelist": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
+       --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"postgres","database.dbname": "postgres","topic.prefix": 
"dbserver1","plugin.name": "pgoutput","schema.include.list": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
        ```
 
    * Use the **YAML** configuration file as shown previously.
@@ -364,7 +364,7 @@ You need to create a configuration file before using the 
Pulsar Debezium connect
         "mongodb.user": "debezium",
         "mongodb.password": "dbz",
         "mongodb.task.id": "1",
-        "database.whitelist": "inventory",
+        "database.include.list": "inventory",
         "pulsar.service.url": "pulsar://127.0.0.1:6650"
      }
   }
@@ -390,7 +390,7 @@ You need to create a configuration file before using the 
Pulsar Debezium connect
       mongodb.user: "debezium"
       mongodb.password: "dbz"
       mongodb.task.id: "1"
-      database.whitelist: "inventory"
+      database.include.list: "inventory"
 
       ## PULSAR_SERVICE_URL_CONFIG
       pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -435,7 +435,7 @@ This example shows how to change the data of a MongoDB 
table using the Pulsar De
        --name debezium-mongodb-source \
        --tenant public \
        --namespace default \
-       --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": 
"dbserver1","mongodb.user": "debezium","mongodb.password": 
"dbz","mongodb.task.id": "1","database.whitelist": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
+       --source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": 
"dbserver1","mongodb.user": "debezium","mongodb.password": 
"dbz","mongodb.task.id": "1","database.include.list": 
"inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
        ```
 
    * Use the **YAML** configuration file as shown previously.
diff --git a/versioned_docs/version-4.2.x/io-debezium-source.md 
b/versioned_docs/version-4.2.x/io-debezium-source.md
index f5fdce1a4be..9c60c5586e9 100644
--- a/versioned_docs/version-4.2.x/io-debezium-source.md
+++ b/versioned_docs/version-4.2.x/io-debezium-source.md
@@ -16,25 +16,25 @@ The Debezium source connector pulls messages from MySQL or 
PostgreSQL and persis
 
 The configuration of the Debezium source connector has the following 
properties.
 
-| Name | Required | Default | Description |
-|------|----------|---------|-------------|
-| `task.class` | true | null | A source task class that implemented in 
Debezium. |
-| `database.hostname` | true | null | The address of a database server. |
-| `database.port` | true | null | The port number of a database server.|
-| `database.user` | true | null | The name of a database user that has the 
required privileges. |
-| `database.password` | true | null | The password for a database user that 
has the required privileges. |
-| `database.server.id` | true | null | The connector's identifier that must be 
unique within a database cluster and similar to the database's server-id 
configuration property. |
-| `database.server.name` | true | null | The logical name of a database 
server/cluster, which forms a namespace and it is used in all the names of 
Kafka topics to which the connector writes, the Kafka Connect schema names, and 
the namespaces of the corresponding Avro schema when the Avro Connector is 
used. |
-| `database.whitelist` | false | null | A list of all databases hosted by this 
server which is monitored by the  connector.<br /><br /> This is optional, and 
there are other properties for listing databases and tables to include or 
exclude from monitoring. |
-| `key.converter` | true | null | The converter provided by Kafka Connect to 
convert record key. |
-| `value.converter` | true | null | The converter provided by Kafka Connect to 
convert record value.  |
-| `database.history` | true | null | The name of the database history class. |
-| `database.history.pulsar.topic` | true | null | The name of the database 
history topic where the connector writes and recovers DDL statements. <br /><br 
/>**Note: this topic is for internal use only and should not be used by 
consumers.** |
-| `database.history.pulsar.service.url` | false| null | Pulsar cluster service 
URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
-| `offset.storage.topic` | true | null | Record the last committed offsets 
that the connector successfully completes. |
-| `json-with-envelope` | false | false | Present the message that only 
consists of payload. |
+| Name                                    | Required | Default | Description |
+|-----------------------------------------|----------|---------|-------------|
+| `task.class`                            | true | null | A source task class 
that implemented in Debezium. |
+| `database.hostname`                     | true | null | The address of a 
database server. |
+| `database.port`                         | true | null | The port number of a 
database server.|
+| `database.user`                         | true | null | The name of a 
database user that has the required privileges. |
+| `database.password`                     | true | null | The password for a 
database user that has the required privileges. |
+| `database.server.id`                    | true | null | The connector's 
identifier that must be unique within a database cluster and similar to the 
database's server-id configuration property. |
+| `topic.prefix`                  | true | null | The logical name of a 
database server/cluster, which forms a namespace and it is used in all the 
names of Kafka topics to which the connector writes, the Kafka Connect schema 
names, and the namespaces of the corresponding Avro schema when the Avro 
Connector is used. |
+| `database.include.list`            | false | null | A list of all databases 
hosted by this server which is monitored by the  connector.<br /><br /> This is 
optional, and there are other properties for listing databases and tables to 
include or exclude from monitoring. |
+| `key.converter`                         | true | null | The converter 
provided by Kafka Connect to convert record key. |
+| `value.converter`                       | true | null | The converter 
provided by Kafka Connect to convert record value.  |
+| `database.history`                      | true | null | The name of the 
database history class. |
+| `database.history.pulsar.topic`         | true | null | The name of the 
database history topic where the connector writes and recovers DDL statements. 
<br /><br />**Note: this topic is for internal use only and should not be used 
by consumers.** |
+| `database.history.pulsar.service.url`   | false| null | Pulsar cluster 
service URL for history topic. <br /><br />**Note**: If 
`database.history.pulsar.service.url` is not set, then the database history 
Pulsar client will use the same client settings as those of the source 
connector, such as `client_auth_plugin` and `client_auth_params`.|
+| `offset.storage.topic`                  | true | null | Record the last 
committed offsets that the connector successfully completes. |
+| `json-with-envelope`                    | false | false | Present the 
message that only consists of payload. |
 | `database.history.pulsar.reader.config` | false | null | The configs of the 
reader for the database schema history topic, in the form of a JSON string with 
key-value pairs. |
-| `offset.storage.reader.config` | false | null | The configs of the reader 
for the kafka connector offsets topic, in the form of a JSON string with 
key-value pairs. |
+| `offset.storage.reader.config`          | false | null | The configs of the 
reader for the kafka connector offsets topic, in the form of a JSON string with 
key-value pairs. |
 
 ### Converter Options
 
@@ -100,13 +100,13 @@ You can use one of the following methods to create a 
configuration file.
         "database.user": "debezium",
         "database.password": "dbz",
         "database.server.id": "184054",
-        "database.server.name": "dbserver1",
-        "database.whitelist": "inventory",
+        "topic.prefix": "dbserver1",
+        "table.include.list": "inventory",
         "database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
         "database.history.pulsar.topic": "history-topic",
         "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
-        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
         "offset.storage.topic": "offset-topic"
      }
   }
@@ -132,15 +132,15 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "debezium"
       database.password: "dbz"
       database.server.id: "184054"
-      database.server.name: "dbserver1"
-      database.whitelist: "inventory"
+      topic.prefix: "dbserver1"
+      database.include.list: "inventory"
       database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
       database.history.pulsar.topic: "history-topic"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
 
       ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
-      key.converter: "org.apache.kafka.connect.json.JsonConverter"
-      value.converter: "org.apache.kafka.connect.json.JsonConverter"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## OFFSET_STORAGE_TOPIC_CONFIG
       offset.storage.topic: "offset-topic"
@@ -179,7 +179,7 @@ This example shows how to change the data of a MySQL table 
using the Pulsar Debe
            --name debezium-mysql-source \
            --tenant public \
            --namespace default \
-           --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","database.server.name": 
"dbserver1","database.whitelist": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.json.JsonConverter" [...]
+           --source-config '{"database.hostname": "localhost","database.port": 
"3306","database.user": "debezium","database.password": 
"dbz","database.server.id": "184054","topic.prefix": 
"dbserver1","database.include.list": "inventory","database.history": 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic":
 "history-topic","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650","key.converter": 
"org.apache.kafka.connect.storage.StringConverter" [...]
        ```
 
      :::note
@@ -278,10 +278,10 @@ You can use one of the following methods to create a 
configuration file.
       "database.user": "postgres",
       "database.password": "changeme",
       "database.dbname": "postgres",
-      "database.server.name": "dbserver1",
+      "topic.prefix": "dbserver1",
       "plugin.name": "pgoutput",
-      "schema.whitelist": "public",
-      "table.whitelist": "public.users",
+      "schema.include.list": "public",
+      "table.include.list": "public.users",
       "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
   }
   ```
@@ -305,10 +305,12 @@ You can use one of the following methods to create a 
configuration file.
       database.user: "postgres"
       database.password: "changeme"
       database.dbname: "postgres"
-      database.server.name: "dbserver1"
+      topic.prefix: "dbserver1"
       plugin.name: "pgoutput"
-      schema.whitelist: "public"
-      table.whitelist: "public.users"
+      schema.include.list: "public"
+      table.include.list: "public.users"
+      key.converter: "org.apache.kafka.connect.storage.StringConverter"
+      value.converter: "org.apache.kafka.connect.storage.StringConverter"
 
       ## PULSAR_SERVICE_URL_CONFIG
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
@@ -349,7 +351,7 @@ This example shows how to change the data of a PostgreSQL 
table using the Pulsar
            --name debezium-postgres-source \
            --tenant public \
            --namespace default \
-           --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"changeme","database.dbname": "postgres","database.server.name": 
"dbserver1","plugin.name": "pgoutput","schema.whitelist": 
"public","table.whitelist": "public.users","pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
+           --source-config '{"database.hostname": "localhost","database.port": 
"5432","database.user": "postgres","database.password": 
"changeme","database.dbname": "postgres","topic.prefix": 
"dbserver1","plugin.name": "pgoutput","schema.include.list": 
"public","table.include.list": "public.users","key.converter": 
"org.apache.kafka.connect.storage.StringConverter","value.converter": 
"org.apache.kafka.connect.storage.StringConverter","pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
        ```
 
      :::note
@@ -441,7 +443,7 @@ You can use one of the following methods to create a 
configuration file.
       "mongodb.user": "debezium",
       "mongodb.password": "dbz",
       "mongodb.task.id": "1",
-      "database.whitelist": "inventory",
+      "database.include.list": "inventory",
       "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
   }
   ```
@@ -466,7 +468,7 @@ You can use one of the following methods to create a 
configuration file.
       mongodb.user: "debezium"
       mongodb.password: "dbz"
       mongodb.task.id: "1"
-      database.whitelist: "inventory"
+      database.include.list: "inventory"
       database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
   ```
 
@@ -509,7 +511,7 @@ This example shows how to change the data of a MongoDB 
table using the Pulsar De
            --name debezium-mongodb-source \
            --tenant public \
            --namespace default \
-           --source-config '{"mongodb.hosts": 
"rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": 
"debezium","mongodb.password": "dbz","mongodb.task.id": 
"1","database.whitelist": "inventory","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
+           --source-config '{"mongodb.hosts": 
"rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": 
"debezium","mongodb.password": "dbz","mongodb.task.id": 
"1","database.include.list": "inventory","database.history.pulsar.service.url": 
"pulsar://127.0.0.1:6650"}'
        ```
 
      :::note
@@ -585,14 +587,13 @@ Using YAML as an example, you can create a 
`debezium-oracle-source-config.yaml`
   "database.user": "dbzuser",
   "database.password": "dbz",
   "database.dbname": "XE",
-  "database.server.name": "XE",
+  "topic.prefix": "XE",
   "schema.exclude.list": "system,dbzuser",
   "snapshot.mode": "initial",
   "topic.namespace": "public/default",
   "task.class": "io.debezium.connector.oracle.OracleConnectorTask",
-  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
+  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
   "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
   "database.tcpKeepAlive": "true",
   "decimal.handling.mode": "double",
@@ -619,14 +620,13 @@ configs:
     database.user: "dbzuser"
     database.password: "dbz"
     database.dbname: "XE"
-    database.server.name: "XE"
+    topic.prefix: "XE"
     schema.exclude.list: "system,dbzuser"
     snapshot.mode: "initial"
     topic.namespace: "public/default"
     task.class: "io.debezium.connector.oracle.OracleConnectorTask"
-    value.converter: "org.apache.kafka.connect.json.JsonConverter"
-    key.converter: "org.apache.kafka.connect.json.JsonConverter"
-    typeClassName: "org.apache.pulsar.common.schema.KeyValue"
+    key.converter: "org.apache.kafka.connect.storage.StringConverter",
+    value.converter: "org.apache.kafka.connect.storage.StringConverter",
     database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
     database.tcpKeepAlive: "true"
     decimal.handling.mode: "double"
@@ -655,13 +655,12 @@ Similarly to other connectors, you can use JSON or YAML 
to configure the connect
   "database.user": "sa",
   "database.password": "MyP@ssw0rd!",
   "database.dbname": "MyTestDB",
-  "database.server.name": "mssql",
+  "topic.prefix": "mssql",
   "snapshot.mode": "schema_only",
   "topic.namespace": "public/default",
   "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
-  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
-  "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
+  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
   "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
   "database.tcpKeepAlive": "true",
   "decimal.handling.mode": "double",
@@ -688,13 +687,12 @@ configs:
     database.user: "sa"
     database.password: "MyP@ssw0rd!"
     database.dbname: "MyTestDB"
-    database.server.name: "mssql"
+    topic.prefix: "mssql"
     snapshot.mode: "schema_only"
     topic.namespace: "public/default"
     task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask"
-    value.converter: "org.apache.kafka.connect.json.JsonConverter"
-    key.converter: "org.apache.kafka.connect.json.JsonConverter"
-    typeClassName: "org.apache.pulsar.common.schema.KeyValue"
+    key.converter: "org.apache.kafka.connect.storage.StringConverter"
+    value.converter: "org.apache.kafka.connect.storage.StringConverter"
     database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
     database.tcpKeepAlive: "true"
     decimal.handling.mode: "double"

Reply via email to