This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9288d987f6 Kafka Connect: Add config to prefix the control consumer
group (#11599)
9288d987f6 is described below
commit 9288d987f6a62b54dbe3cb7bdf61f896e7a8b0b5
Author: Hugo Friant <[email protected]>
AuthorDate: Wed Nov 27 15:47:51 2024 +0100
Kafka Connect: Add config to prefix the control consumer group (#11599)
* Add the ability to change the control group prefix
* Add doc
* Review naming
* Review naming
* Fix violations
---
docs/docs/kafka-connect.md | 1 +
.../java/org/apache/iceberg/connect/IcebergSinkConfig.java | 11 +++++++++++
.../main/java/org/apache/iceberg/connect/channel/Worker.java | 2 +-
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md
index a904a17a99..836f1d8852 100644
--- a/docs/docs/kafka-connect.md
+++ b/docs/docs/kafka-connect.md
@@ -77,6 +77,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.table.\<table name\>.partition-by | Comma-separated list of
partition fields to use when creating the table
|
| iceberg.table.\<table name\>.route-regex | The regex used to match a
record's `routeField` to a table
|
| iceberg.control.topic | Name of the control topic,
default is `control-iceberg`
|
+| iceberg.control.group-id-prefix | Prefix for the control consumer
group, default is `cg-control`
|
| iceberg.control.commit.interval-ms | Commit interval in msec,
default is 300,000 (5 min)
|
| iceberg.control.commit.timeout-ms | Commit timeout interval in
msec, default is 30,000 (30 sec)
|
| iceberg.control.commit.threads | Number of threads to use for
commits, default is (cores * 2)
|
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index bf5b59a0f0..8e59d73923 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -80,6 +80,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
+ private static final String CONTROL_GROUP_ID_PREFIX_PROP =
"iceberg.control.group-id-prefix";
private static final String COMMIT_INTERVAL_MS_PROP =
"iceberg.control.commit.interval-ms";
private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
private static final String COMMIT_TIMEOUT_MS_PROP =
"iceberg.control.commit.timeout-ms";
@@ -180,6 +181,12 @@ public class IcebergSinkConfig extends AbstractConfig {
DEFAULT_CONTROL_TOPIC,
Importance.MEDIUM,
"Name of the control topic");
+ configDef.define(
+ CONTROL_GROUP_ID_PREFIX_PROP,
+ ConfigDef.Type.STRING,
+ DEFAULT_CONTROL_GROUP_PREFIX,
+ Importance.LOW,
+ "Prefix of the control consumer group");
configDef.define(
CONNECT_GROUP_ID_PROP,
ConfigDef.Type.STRING,
@@ -359,6 +366,10 @@ public class IcebergSinkConfig extends AbstractConfig {
return getString(CONTROL_TOPIC_PROP);
}
+ public String controlGroupIdPrefix() {
+ return getString(CONTROL_GROUP_ID_PREFIX_PROP);
+ }
+
public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
if (result != null) {
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
index 7555b216cd..27c5b9622f 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
@@ -51,7 +51,7 @@ class Worker extends Channel {
// pass transient consumer group ID to which we never commit offsets
super(
"worker",
- IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
+ config.controlGroupIdPrefix() + UUID.randomUUID(),
config,
clientFactory,
context);