lovatti opened a new issue, #9190:
URL: https://github.com/apache/seatunnel/issues/9190

   Hello!
   
   I successfully configured Apache Seatunnel + Oracle-CDC and Logminer to send 
data to ClickHouse.
   
   I need to capture the data changes, so I use Oracle-CDC and STREAMING.
   
   But when the Seatunnel job restarts, all the records are read again and 
replicated to ClickHouse if I don't filter it in the transform.
   
   I tried a lot of config combinations without success.
   
   Did anyone configured this?
   
   My conf files for small tables from few to 30 million records are like:
   
   env {
     # You can set SeaTunnel environment configuration here
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 120000
     checkpoint.timeout = 120000
   }
   
   source {
     # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     Oracle-CDC {
       plugin_output = "ora-ge_empresa"
       username = "proj"
       password = "mypass"
       database-names = ["CONSHO"]
       schema-names = ["CONSINCO"]
       table-names = ["CONSHO.CONSINCO.GE_EMPRESA"]
       base-url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
       driver = "oracle.jdbc.OracleDriver"
       source.reader.close.timeout = 120000
     }
   }
   
   sink {
     Clickhouse {
       plugin_input = "ora-ge_empresa"
       host = "ch-ip:8123"
       database = "CONSINCO"
       table = "GE_EMPRESA"
       #table = "${table_name}"
       username = "default"
       password = "mypass"
       # cdc options
       #primary_key = "id"
       support_upsert = true
       allow_experimental_lightweight_delete = true
     }
   }
   
   ------------------------------------
   
   For big tables:
   
   env {
     # You can set SeaTunnel environment configuration here
     parallelism = 1
     job.mode = "STREAMING"
     log_level = "DEBUG"
     checkpoint.interval = 30000 # 30 segundos
     checkpoint.timeout = 21600000 # 6 horas
   }
   
   source {
     Oracle-CDC {
       plugin_output = "ora-mrl_lanctoestoque"
       username = "proj"
       password = "mypass"
       database-names = ["CONSHO"]
       schema-names = ["CONSINCO"]
       table-names = ["CONSHO.CONSINCO.MRL_LANCTOESTOQUE"]
       base-url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
       driver = "oracle.jdbc.OracleDriver" 
       source.reader.close.timeout = 18000000 # 5 horas  
       #startup.mode = "earliest"
     }
   }
   
   transform {
     sql {
       plugin_input = "ora-mrl_lanctoestoque"
       plugin_output = "transf-mrl_lanctoestoque"
       query = """SELECT 
   col1,
   col2,
   ....
   from dual
   where DATEFIELD >= TO_DATE('2025-01-01 00:00:01','yyyy-MM-dd HH:mm:ss')    
       """
     }
   }
   
   sink {
     Clickhouse {
       plugin_input = "transf-mrl_lanctoestoque"
       host = "ch-ip:8123"
       database = "CONSINCO"
       table = "MRL_LANCTOESTOQUE"
       #table = "${table_name}"
       username = "default"
       password = "mypass"
       # cdc options
       #primary_key = ""
       support_upsert = true
       allow_experimental_lightweight_delete = true
       bulk_size = 50000
     }
   }
   
   
   This example for big tables WORKS, but all data is read again in every 
restart and I need to filter in the transform, but can take very long time. In 
this case, 12 hours for 200 million records in a Dev Environment.
   
   I would like to filter the Source and tried some Debezium options without 
success.
   
   Like snapshot.select.statement.overrides.
   
   But the Source always tries to read all rows again. Apparently the Debezium 
options are not being used. Or the options are configured wrong, I hope.
   
   Any help?
   
   Thank you!
   
   
   format = compatible_debezium_json
       debezium {
         # 基础参数
         # database.server.name = "oracle_server"
         #database.history.skip.unparseable.ddl = true
         #database.url = "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST = 
(ADDRESS = (PROTOCOL = TCP)(HOST = ora-ip)(PORT = 1521)))(CONNECT_DATA = 
(SERVER = DEDICATED)(SERVICE_NAME = CONSHO)))"
         # database.url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
         # database.port = "1521"
         # database.user = "proj"
         # database.password = "mypass"
         # database.dbname = "CONSHO"
         #
         # schema.include.list = "CONSINCO"
         # table.include.list = "CONSINCO.MRL_PRODUTOEMPRESA"
         #
         # database.connection.adapter = "logminer"
         # log.mining.strategy = "online_catalog"
         
         # include schema into kafka message
         key.converter.schemas.enable = false
         value.converter.schemas.enable = false
   
         # "initial" 或 "schema_only"
         snapshot.mode = "initial"
         #snapshot.locking.mode = "none"
         snapshot.select.statement.overrides = 
"CONSHO.CONSINCO.MRL_PRODUTOEMPRESA"
         snapshot.select.statement.overrides.CONSHO.CONSINCO.MRL_PRODUTOEMPRESA 
= "SELECT * FROM CONSINCO.MRL_PRODUTOEMPRESA WHERE DTAALTERACAO > 
TO_DATE('2025-01-01','YYYY-MM-DD') FETCH FIRST 3 ROWS ONLY"
         # signal.data.collection = "DB.USER.SIGNAL_SNAPSHOT"
         # signal.enabled.channels = "source"
       }
   
   
   
   Bruno


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to