krutoileshii commented on issue #10317:
URL: https://github.com/apache/seatunnel/issues/10317#issuecomment-3739890549

   @yzeng1618 Thank you adding is_regex got it working. 
   
   Do have a question, if i have multiple tables coming from the source, is 
there a way to specify multiple primary key combinations for each one as they 
could be different depending on the table? or do i have to define multiple 
syncs for this?
   
   ```
   ############################################################ # 
vendorroyalties_cdc_to_iceberg_polaris_upsert.conf 
############################################################
   env {
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 10000
     checkpoint.timeout = 600000
   }
   source {
     SqlServer-CDC {
       plugin_output = "mssql_cdc"
       username = "user_name"
       password = "password"
       url = 
"jdbc:sqlserver://IP:PORT;databaseName=db_name;encrypt=true;trustServerCertificate=true"
       database-names = ["AlloCustom"]
       table-names = 
["AlloCustom.Finance.VendorRoyalties","AlloCustom.Finance.Parts"]
       startup.mode = "initial"
       debezium {
         database.encrypt = "true"
         database.trustServerCertificate = "true"
       }
       incremental.parallelism = 1
       snapshot.split.size = 8096
     }
   }
   transform {
     Metadata {
       metadata_fields {
         Database = database
         Table = table
         RowKind = rowKind
         EventTime = ts_ms
         Delay = delay
       }
       plugin_input = "mssql_cdc"
       plugin_output = "add_metadata"
     }
     TableRename {
       plugin_input = "add_metadata"
       plugin_output = "table_names_to_snake_case"
       replacements_with_regex = [
         # Normalize separators (kebab, spaces, dots, $, etc.) -> underscore
         { replace_from = "[^A-Za-z0-9_]+", replace_to = "_" }, # Insert 
underscores at boundaries (no capture groups needed)
         { replace_from = "(?<=[A-Z])(?=[A-Z][a-z])", replace_to = "_" }, # 
HTTPResponse -> HTTP_Response
         { replace_from = "(?<=[a-z0-9])(?=[A-Z])", replace_to = "_" }, # 
camelCase / 500Error
         { replace_from = "(?<=[A-Za-z])(?=[0-9])", replace_to = "_" }, # 
version2 -> version_2
         { replace_from = "(?<=[0-9])(?=[A-Za-z])", replace_to = "_" } # 
user123Name -> user_123_Name
       ]
       convert_case = "LOWER"
     }
     FieldRename {
       plugin_input = "table_names_to_snake_case"
       plugin_output = "column_names_to_snake_case"
       replacements_with_regex = [
         { replace_from = "[^A-Za-z0-9_]+", replace_to = "_" },
         { replace_from = "(?<=[A-Z])(?=[A-Z][a-z])", replace_to = "_" },
         { replace_from = "(?<=[a-z0-9])(?=[A-Z])", replace_to = "_" },
         { replace_from = "(?<=[A-Za-z])(?=[0-9])", replace_to = "_" },
         { replace_from = "(?<=[0-9])(?=[A-Za-z])", replace_to = "_" }
       ]
       convert_case = "LOWER"
     }
   }
   sink {
     Iceberg {
       plugin_input = "column_names_to_snake_case"
       catalog_name = "polaris"
       namespace = "allocustom"
       table = "${schema_name}__${table_name}"
       # Upsert mode for CDC correctness
       iceberg.table.upsert-mode-enabled = true
       # Best-effort composite key (edit if you know the real unique key)
       iceberg.table.primary-keys = [
           { "vendor_royalties" = 
"invoice_num,invoice_line,vendor_num,vendor_id"},
           {"parts" = "part_num,company,vendor_num"}
         ]
   
       iceberg.table.schema-evolution-enabled = true
       iceberg.table.write-props = {
         write.format.default = "parquet"
       }
       iceberg.catalog.config = {
         "type" = "rest"
         "uri" = "http://ip:8181/api/catalog";
         "warehouse" = "data-warehouse-test"
         "credential" = "admin:password"
         "scope" = "PRINCIPAL_ROLE:ALL"
         "token-refresh-enabled" = "true"
         "io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
         "warehouse.location" = "s3://testdec16/raw"
         "client.region" = "us-east-1"
         "s3.endpoint" = "https://endpoint";
         "s3.path-style-access" = "true"
         "s3.access-key-id" = "access_key"
         "s3.secret-access-key" = "secret_key"
       }
     }
   }
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to