KRISHNA-798 commented on issue #8619:
URL: https://github.com/apache/seatunnel/issues/8619#issuecomment-2645011156

   Hi @SoulSong , yes you are right but mongodb cdc works with "debezium_json" 
format in kafka source and sink.
   
   1st config:
   
   ```
   env {
     parallelism = 1
     job.name = "mongo_kafka"
     job.mode = "STREAMING"
     checkpoint.interval = 5000
   }
   
   source {
     MongoDB-CDC {
       hosts = "mongo:27017"
       database = ["****"]
       collection = ["****.****"]
       username = ****
       password = ****
       schema = {
         table = "****.****"
         fields {
           "_id" : string,
           "name" : string,
           "age": int
         }
       }
     }
   }
   
   sink {
     Console {
       parallelism = 1
     }
     kafka {
         topic = "mongo_cdc"
         bootstrap.servers = "ka****:9092"
         format = debezium_json
         kafka.request.timeout.ms = 60000
         semantics = EXACTLY_ONCE
         kafka.config = {
           acks = "all"
           request.timeout.ms = 60000
           buffer.memory = 33554432
         }
     }
   }
   ```
   
   
   2nd config:
   
   ```
   env {
     parallelism = 1
     job.mode = "STREAMING"
     job.name = "kafka_mongo"
     logging.level = "DEBUG"
   }
   
   source {
     Kafka {
       topic = "mongo_cdc"
       bootstrap.servers = "ka****:9092"
       # start_mode="earliest"
       consumer.group = "group1"
       format = debezium_json
       debezium_record_include_schema = false
       schema {
        fields {
           _id = string
           name = string
           age = int
         }
       }
       kafka.config = {
         auto.offset.reset = "earliest"
         enable.auto.commit = "false"
         max.poll.records = 1
       }
     }
   }
   
   sink {
     Console{}
     MongoDB {
       uri = "mongodb://mo****:27017/****"
       database = "****"
       collection = "****_target"
       upsert-enable = true
       primary-key = ["_id"]
       schema = {
         fields {
           _id = string
           name = string
           age = int
         }
       }
     }
   }
   ```
   
   Hope this helps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to