kumarpritam863 opened a new pull request, #15207:
URL: https://github.com/apache/iceberg/pull/15207
# Summary
This PR adds support for pluggable Committer implementations in the
Iceberg Kafka Connect connector, allowing users to provide custom commit
strategies via configuration. Previously, the connector was hardcoded to use
**CommitterImpl**. This change enables users to implement and configure custom
committers without modifying the connector source code.
# Motivation
Different use cases may require different commit strategies:
- Alternative coordination mechanisms - Users may want to use external
consensus systems (e.g., ZooKeeper, etcd, Apache Ratis)
- Custom commit policies - Time-based, size-based, or
business-logic-driven commit strategies
- Enhanced monitoring - Custom metrics collection and observability
- Specialized error handling - Domain-specific retry logic and failure
recovery
The current implementation hardcodes CommitterImpl, requiring users to
fork the codebase to implement custom commit logic. This PR makes the committer
pluggable while maintaining full backward
compatibility.
# Backward Compatibility
This change is fully backward compatible:
- When iceberg.committer.class is not configured, uses default
CommitterImpl
- Existing configurations work without modifications
- No changes to the Committer interface
- No breaking API changes
# Usage Example
**Uses default CommitterImpl**
Default Behavior (No Configuration Changes):
iceberg.tables=my_db.my_table
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://my-bucket/warehouse
**Loads custom committer**
Custom Committer:
iceberg.tables=my_db.my_table
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://my-bucket/warehouse
iceberg.committer.class=com.example.MyCustomCommitter
# Testing
**Test coverage includes:**
- ✅ Default committer instantiation and operation
- ✅ Custom committer lifecycle
- ✅ Factory error handling
- ✅ Configuration validation
- ✅ Complete lifecycle flows
- ✅ Instance isolation
# Files Changed
**Modified:**
-
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
-
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
**Added:**
-
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCustomCommitter.java
-
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCommitterFactory.java
-
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitter.java
**Removed:**
-
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]