krutoileshii commented on issue #10317:
URL: https://github.com/apache/seatunnel/issues/10317#issuecomment-3739890549
@yzeng1618 Thank you adding is_regex got it working.
Do have a question, if i have multiple tables coming from the source, is
there a way to specify multiple primary key combinations for each one as they
could be different depending on the table? or do i have to define multiple
syncs for this?
```
############################################################ #
vendorroyalties_cdc_to_iceberg_polaris_upsert.conf
############################################################
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
checkpoint.timeout = 600000
}
source {
SqlServer-CDC {
plugin_output = "mssql_cdc"
username = "user_name"
password = "password"
url =
"jdbc:sqlserver://IP:PORT;databaseName=db_name;encrypt=true;trustServerCertificate=true"
database-names = ["AlloCustom"]
table-names =
["AlloCustom.Finance.VendorRoyalties","AlloCustom.Finance.Parts"]
startup.mode = "initial"
debezium {
database.encrypt = "true"
database.trustServerCertificate = "true"
}
incremental.parallelism = 1
snapshot.split.size = 8096
}
}
transform {
Metadata {
metadata_fields {
Database = database
Table = table
RowKind = rowKind
EventTime = ts_ms
Delay = delay
}
plugin_input = "mssql_cdc"
plugin_output = "add_metadata"
}
TableRename {
plugin_input = "add_metadata"
plugin_output = "table_names_to_snake_case"
replacements_with_regex = [
# Normalize separators (kebab, spaces, dots, $, etc.) -> underscore
{ replace_from = "[^A-Za-z0-9_]+", replace_to = "_" }, # Insert
underscores at boundaries (no capture groups needed)
{ replace_from = "(?<=[A-Z])(?=[A-Z][a-z])", replace_to = "_" }, #
HTTPResponse -> HTTP_Response
{ replace_from = "(?<=[a-z0-9])(?=[A-Z])", replace_to = "_" }, #
camelCase / 500Error
{ replace_from = "(?<=[A-Za-z])(?=[0-9])", replace_to = "_" }, #
version2 -> version_2
{ replace_from = "(?<=[0-9])(?=[A-Za-z])", replace_to = "_" } #
user123Name -> user_123_Name
]
convert_case = "LOWER"
}
FieldRename {
plugin_input = "table_names_to_snake_case"
plugin_output = "column_names_to_snake_case"
replacements_with_regex = [
{ replace_from = "[^A-Za-z0-9_]+", replace_to = "_" },
{ replace_from = "(?<=[A-Z])(?=[A-Z][a-z])", replace_to = "_" },
{ replace_from = "(?<=[a-z0-9])(?=[A-Z])", replace_to = "_" },
{ replace_from = "(?<=[A-Za-z])(?=[0-9])", replace_to = "_" },
{ replace_from = "(?<=[0-9])(?=[A-Za-z])", replace_to = "_" }
]
convert_case = "LOWER"
}
}
sink {
Iceberg {
plugin_input = "column_names_to_snake_case"
catalog_name = "polaris"
namespace = "allocustom"
table = "${schema_name}__${table_name}"
# Upsert mode for CDC correctness
iceberg.table.upsert-mode-enabled = true
# Best-effort composite key (edit if you know the real unique key)
iceberg.table.primary-keys = [
{ "vendor_royalties" =
"invoice_num,invoice_line,vendor_num,vendor_id"},
{"parts" = "part_num,company,vendor_num"}
]
iceberg.table.schema-evolution-enabled = true
iceberg.table.write-props = {
write.format.default = "parquet"
}
iceberg.catalog.config = {
"type" = "rest"
"uri" = "http://ip:8181/api/catalog"
"warehouse" = "data-warehouse-test"
"credential" = "admin:password"
"scope" = "PRINCIPAL_ROLE:ALL"
"token-refresh-enabled" = "true"
"io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
"warehouse.location" = "s3://testdec16/raw"
"client.region" = "us-east-1"
"s3.endpoint" = "https://endpoint"
"s3.path-style-access" = "true"
"s3.access-key-id" = "access_key"
"s3.secret-access-key" = "secret_key"
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]