kumarpritam863 opened a new pull request, #15207:
URL: https://github.com/apache/iceberg/pull/15207

   # Summary
   
     This PR adds support for pluggable Committer implementations in the 
Iceberg Kafka Connect connector, allowing users to provide custom commit 
strategies via configuration. Previously, the connector was hardcoded to use 
**CommitterImpl**. This change enables users to implement and configure custom 
committers without modifying the connector source code.
   
   # Motivation
   
     Different use cases may require different commit strategies:
     - Alternative coordination mechanisms - Users may want to use external 
consensus systems (e.g., ZooKeeper, etcd, Apache Ratis)
     - Custom commit policies - Time-based, size-based, or 
business-logic-driven commit strategies
     - Enhanced monitoring - Custom metrics collection and observability
     - Specialized error handling - Domain-specific retry logic and failure 
recovery
   
     The current implementation hardcodes CommitterImpl, requiring users to 
fork the codebase to implement custom commit logic. This PR makes the committer 
pluggable while maintaining full backward
     compatibility.
   
   # Backward Compatibility
   
     This change is fully backward compatible:
     - When iceberg.committer.class is not configured, uses default 
CommitterImpl
     - Existing configurations work without modifications
     - No changes to the Committer interface
     - No breaking API changes
   
   # Usage Example
   
      **Uses default CommitterImpl**
     Default Behavior (No Configuration Changes):
     iceberg.tables=my_db.my_table
     iceberg.catalog.type=hadoop
     iceberg.catalog.warehouse=s3://my-bucket/warehouse
   
      **Loads custom committer**
     Custom Committer:
     iceberg.tables=my_db.my_table
     iceberg.catalog.type=hadoop
     iceberg.catalog.warehouse=s3://my-bucket/warehouse
     iceberg.committer.class=com.example.MyCustomCommitter
   
   # Testing
     **Test coverage includes:**
     - ✅ Default committer instantiation and operation
     - ✅ Custom committer lifecycle
     - ✅ Factory error handling
     - ✅ Configuration validation
     - ✅ Complete lifecycle flows
     - ✅ Instance isolation
   
   # Files Changed
   
     **Modified:**
     - 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
     - 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
   
     **Added:**
     - 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCustomCommitter.java
     - 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/TestCommitterFactory.java
     - 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitter.java
   
     **Removed:**
     - 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to