KRISHNA-798 commented on issue #8619:
URL: https://github.com/apache/seatunnel/issues/8619#issuecomment-2645011156
Hi @SoulSong , yes you are right but mongodb cdc works with "debezium_json"
format in kafka source and sink.
1st config:
```
env {
parallelism = 1
job.name = "mongo_kafka"
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo:27017"
database = ["****"]
collection = ["****.****"]
username = ****
password = ****
schema = {
table = "****.****"
fields {
"_id" : string,
"name" : string,
"age": int
}
}
}
}
sink {
Console {
parallelism = 1
}
kafka {
topic = "mongo_cdc"
bootstrap.servers = "ka****:9092"
format = debezium_json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}
```
2nd config:
```
env {
parallelism = 1
job.mode = "STREAMING"
job.name = "kafka_mongo"
logging.level = "DEBUG"
}
source {
Kafka {
topic = "mongo_cdc"
bootstrap.servers = "ka****:9092"
# start_mode="earliest"
consumer.group = "group1"
format = debezium_json
debezium_record_include_schema = false
schema {
fields {
_id = string
name = string
age = int
}
}
kafka.config = {
auto.offset.reset = "earliest"
enable.auto.commit = "false"
max.poll.records = 1
}
}
}
sink {
Console{}
MongoDB {
uri = "mongodb://mo****:27017/****"
database = "****"
collection = "****_target"
upsert-enable = true
primary-key = ["_id"]
schema = {
fields {
_id = string
name = string
age = int
}
}
}
}
```
Hope this helps.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]