This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 03fc1ae330 Kafka Connect: Add config for transactional ID prefix
(#11780)
03fc1ae330 is described below
commit 03fc1ae33096716e9afdc7c4f1dce91de48f32bc
Author: Thomas Jaeckle <[email protected]>
AuthorDate: Fri Mar 14 16:21:41 2025 +0100
Kafka Connect: Add config for transactional ID prefix (#11780)
---
docs/docs/kafka-connect.md | 1 +
.../org/apache/iceberg/connect/IcebergSinkConfig.java | 17 +++++++++++++++++
.../org/apache/iceberg/connect/channel/Channel.java | 2 +-
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md
index 929c43220c..92bcb8c708 100644
--- a/docs/docs/kafka-connect.md
+++ b/docs/docs/kafka-connect.md
@@ -81,6 +81,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.control.commit.interval-ms | Commit interval in msec,
default is 300,000 (5 min)
|
| iceberg.control.commit.timeout-ms | Commit timeout interval in
msec, default is 30,000 (30 sec)
|
| iceberg.control.commit.threads | Number of threads to use for
commits, default is (cores * 2)
|
+| iceberg.coordinator.transactional.prefix | Prefix for the transactional id
to use for the coordinator producer, default is to use no/empty prefix
|
| iceberg.catalog | Name of the catalog, default is
`iceberg`
|
| iceberg.catalog.* | Properties passed through to
Iceberg catalog initialization
|
| iceberg.hadoop-conf-dir | If specified, Hadoop config
files in this directory will be loaded
|
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index 8e59d73923..a4e15932f1 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -87,6 +87,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000;
private static final String COMMIT_THREADS_PROP =
"iceberg.control.commit.threads";
private static final String CONNECT_GROUP_ID_PROP =
"iceberg.connect.group-id";
+ private static final String TRANSACTIONAL_PREFIX_PROP =
+ "iceberg.coordinator.transactional.prefix";
private static final String HADOOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";
private static final String NAME_PROP = "name";
@@ -211,6 +213,12 @@ public class IcebergSinkConfig extends AbstractConfig {
Runtime.getRuntime().availableProcessors() * 2,
Importance.MEDIUM,
"Coordinator threads to use for table commits, default is (cores *
2)");
+ configDef.define(
+ TRANSACTIONAL_PREFIX_PROP,
+ ConfigDef.Type.STRING,
+ null,
+ Importance.LOW,
+ "Optional prefix of the transactional id for the coordinator");
configDef.define(
HADOOP_CONF_DIR_PROP,
ConfigDef.Type.STRING,
@@ -393,6 +401,15 @@ public class IcebergSinkConfig extends AbstractConfig {
return getInt(COMMIT_THREADS_PROP);
}
+ public String transactionalPrefix() {
+ String result = getString(TRANSACTIONAL_PREFIX_PROP);
+ if (result != null) {
+ return result;
+ }
+
+ return "";
+ }
+
public String hadoopConfDir() {
return getString(HADOOP_CONF_DIR_PROP);
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
index 993fcf67c9..01cf165de6 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
@@ -64,7 +64,7 @@ abstract class Channel {
this.connectGroupId = config.connectGroupId();
this.context = context;
- String transactionalId = name + config.transactionalSuffix();
+ String transactionalId = config.transactionalPrefix() + name +
config.transactionalSuffix();
this.producer = clientFactory.createProducer(transactionalId);
this.consumer = clientFactory.createConsumer(consumerGroupId);
this.admin = clientFactory.createAdmin();