This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9288d987f6 Kafka Connect: Add config to prefix the control consumer 
group (#11599)
9288d987f6 is described below

commit 9288d987f6a62b54dbe3cb7bdf61f896e7a8b0b5
Author: Hugo Friant <[email protected]>
AuthorDate: Wed Nov 27 15:47:51 2024 +0100

    Kafka Connect: Add config to prefix the control consumer group (#11599)
    
    * Add the ability to change the control group prefix
    
    * Add doc
    
    * Review naming
    
    * Review naming
    
    * Fix violations
---
 docs/docs/kafka-connect.md                                    |  1 +
 .../java/org/apache/iceberg/connect/IcebergSinkConfig.java    | 11 +++++++++++
 .../main/java/org/apache/iceberg/connect/channel/Worker.java  |  2 +-
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md
index a904a17a99..836f1d8852 100644
--- a/docs/docs/kafka-connect.md
+++ b/docs/docs/kafka-connect.md
@@ -77,6 +77,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
 | iceberg.table.\<table name\>.partition-by  | Comma-separated list of 
partition fields to use when creating the table                                 
         |
 | iceberg.table.\<table name\>.route-regex   | The regex used to match a 
record's `routeField` to a table                                                
       |
 | iceberg.control.topic                      | Name of the control topic, 
default is `control-iceberg`                                                    
      |
+| iceberg.control.group-id-prefix            | Prefix for the control consumer 
group, default is `cg-control`                                                  
 |
 | iceberg.control.commit.interval-ms         | Commit interval in msec, 
default is 300,000 (5 min)                                                      
        |
 | iceberg.control.commit.timeout-ms          | Commit timeout interval in 
msec, default is 30,000 (30 sec)                                                
      |
 | iceberg.control.commit.threads             | Number of threads to use for 
commits, default is (cores * 2)                                                 
    |
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index bf5b59a0f0..8e59d73923 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -80,6 +80,7 @@ public class IcebergSinkConfig extends AbstractConfig {
   private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
       "iceberg.tables.schema-case-insensitive";
   private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
+  private static final String CONTROL_GROUP_ID_PREFIX_PROP = 
"iceberg.control.group-id-prefix";
   private static final String COMMIT_INTERVAL_MS_PROP = 
"iceberg.control.commit.interval-ms";
   private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
   private static final String COMMIT_TIMEOUT_MS_PROP = 
"iceberg.control.commit.timeout-ms";
@@ -180,6 +181,12 @@ public class IcebergSinkConfig extends AbstractConfig {
         DEFAULT_CONTROL_TOPIC,
         Importance.MEDIUM,
         "Name of the control topic");
+    configDef.define(
+        CONTROL_GROUP_ID_PREFIX_PROP,
+        ConfigDef.Type.STRING,
+        DEFAULT_CONTROL_GROUP_PREFIX,
+        Importance.LOW,
+        "Prefix of the control consumer group");
     configDef.define(
         CONNECT_GROUP_ID_PROP,
         ConfigDef.Type.STRING,
@@ -359,6 +366,10 @@ public class IcebergSinkConfig extends AbstractConfig {
     return getString(CONTROL_TOPIC_PROP);
   }
 
+  public String controlGroupIdPrefix() {
+    return getString(CONTROL_GROUP_ID_PREFIX_PROP);
+  }
+
   public String connectGroupId() {
     String result = getString(CONNECT_GROUP_ID_PROP);
     if (result != null) {
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
index 7555b216cd..27c5b9622f 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
@@ -51,7 +51,7 @@ class Worker extends Channel {
     // pass transient consumer group ID to which we never commit offsets
     super(
         "worker",
-        IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
+        config.controlGroupIdPrefix() + UUID.randomUUID(),
         config,
         clientFactory,
         context);

Reply via email to