lovatti opened a new issue, #9190:
URL: https://github.com/apache/seatunnel/issues/9190
Hello!
I successfully configured Apache Seatunnel + Oracle-CDC and Logminer to send
data to ClickHouse.
I need to capture the data changes, so I use Oracle-CDC and STREAMING.
But when the Seatunnel job restarts, all the records are read again and
replicated to ClickHouse if I don't filter it in the transform.
I tried a lot of config combinations without success.
Did anyone configured this?
My conf files for small tables from few to 30 million records are like:
env {
# You can set SeaTunnel environment configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 120000
checkpoint.timeout = 120000
}
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
Oracle-CDC {
plugin_output = "ora-ge_empresa"
username = "proj"
password = "mypass"
database-names = ["CONSHO"]
schema-names = ["CONSINCO"]
table-names = ["CONSHO.CONSINCO.GE_EMPRESA"]
base-url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
driver = "oracle.jdbc.OracleDriver"
source.reader.close.timeout = 120000
}
}
sink {
Clickhouse {
plugin_input = "ora-ge_empresa"
host = "ch-ip:8123"
database = "CONSINCO"
table = "GE_EMPRESA"
#table = "${table_name}"
username = "default"
password = "mypass"
# cdc options
#primary_key = "id"
support_upsert = true
allow_experimental_lightweight_delete = true
}
}
------------------------------------
For big tables:
env {
# You can set SeaTunnel environment configuration here
parallelism = 1
job.mode = "STREAMING"
log_level = "DEBUG"
checkpoint.interval = 30000 # 30 segundos
checkpoint.timeout = 21600000 # 6 horas
}
source {
Oracle-CDC {
plugin_output = "ora-mrl_lanctoestoque"
username = "proj"
password = "mypass"
database-names = ["CONSHO"]
schema-names = ["CONSINCO"]
table-names = ["CONSHO.CONSINCO.MRL_LANCTOESTOQUE"]
base-url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
driver = "oracle.jdbc.OracleDriver"
source.reader.close.timeout = 18000000 # 5 horas
#startup.mode = "earliest"
}
}
transform {
sql {
plugin_input = "ora-mrl_lanctoestoque"
plugin_output = "transf-mrl_lanctoestoque"
query = """SELECT
col1,
col2,
....
from dual
where DATEFIELD >= TO_DATE('2025-01-01 00:00:01','yyyy-MM-dd HH:mm:ss')
"""
}
}
sink {
Clickhouse {
plugin_input = "transf-mrl_lanctoestoque"
host = "ch-ip:8123"
database = "CONSINCO"
table = "MRL_LANCTOESTOQUE"
#table = "${table_name}"
username = "default"
password = "mypass"
# cdc options
#primary_key = ""
support_upsert = true
allow_experimental_lightweight_delete = true
bulk_size = 50000
}
}
This example for big tables WORKS, but all data is read again in every
restart and I need to filter in the transform, but can take very long time. In
this case, 12 hours for 200 million records in a Dev Environment.
I would like to filter the Source and tried some Debezium options without
success.
Like snapshot.select.statement.overrides.
But the Source always tries to read all rows again. Apparently the Debezium
options are not being used. Or the options are configured wrong, I hope.
Any help?
Thank you!
format = compatible_debezium_json
debezium {
# 基础参数
# database.server.name = "oracle_server"
#database.history.skip.unparseable.ddl = true
#database.url = "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = ora-ip)(PORT = 1521)))(CONNECT_DATA =
(SERVER = DEDICATED)(SERVICE_NAME = CONSHO)))"
# database.url = "jdbc:oracle:thin:@ora-ip:1521:CONSHO"
# database.port = "1521"
# database.user = "proj"
# database.password = "mypass"
# database.dbname = "CONSHO"
#
# schema.include.list = "CONSINCO"
# table.include.list = "CONSINCO.MRL_PRODUTOEMPRESA"
#
# database.connection.adapter = "logminer"
# log.mining.strategy = "online_catalog"
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# "initial" 或 "schema_only"
snapshot.mode = "initial"
#snapshot.locking.mode = "none"
snapshot.select.statement.overrides =
"CONSHO.CONSINCO.MRL_PRODUTOEMPRESA"
snapshot.select.statement.overrides.CONSHO.CONSINCO.MRL_PRODUTOEMPRESA
= "SELECT * FROM CONSINCO.MRL_PRODUTOEMPRESA WHERE DTAALTERACAO >
TO_DATE('2025-01-01','YYYY-MM-DD') FETCH FIRST 3 ROWS ONLY"
# signal.data.collection = "DB.USER.SIGNAL_SNAPSHOT"
# signal.enabled.channels = "source"
}
Bruno
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]