This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d8e7642d4e Kafka Connect: Commit coordination (#10351)
d8e7642d4e is described below

commit d8e7642d4e61b6d442c38b2cab1f631864c4e368
Author: Bryan Keller <[email protected]>
AuthorDate: Thu Jul 11 08:22:20 2024 -0700

    Kafka Connect: Commit coordination (#10351)
    
    * Kafka Connect: Commit coordination
    
    * PR feedback
    
    * Get partition assignment from consumer
    
    * test fix
---
 .../iceberg/connect/events/TableReference.java     |  20 ++
 .../iceberg/connect/events/EventTestUtil.java      |   3 +-
 .../org/apache/iceberg/connect/CatalogUtils.java   |  98 +++++++
 .../{data/NoOpWriter.java => Committer.java}       |  24 +-
 .../RecordWriter.java => CommitterFactory.java}    |  16 +-
 .../apache/iceberg/connect/IcebergSinkConfig.java  |  23 +-
 .../iceberg/connect/IcebergSinkConnector.java      |   4 +-
 .../apache/iceberg/connect/IcebergSinkTask.java    | 107 +++++++
 .../apache/iceberg/connect/channel/Channel.java    | 167 +++++++++++
 .../iceberg/connect/channel/CommitState.java       | 167 +++++++++++
 .../iceberg/connect/channel/CommitterImpl.java     | 128 +++++++++
 .../iceberg/connect/channel/Coordinator.java       | 314 +++++++++++++++++++++
 .../iceberg/connect/channel/CoordinatorThread.java |  69 +++++
 .../NoOpWriter.java => channel/Envelope.java}      |  33 ++-
 .../connect/channel/KafkaClientFactory.java        |  68 +++++
 .../apache/iceberg/connect/channel/KafkaUtils.java |  66 +++++
 .../NotRunningException.java}                      |  16 +-
 .../org/apache/iceberg/connect/channel/Worker.java | 126 +++++++++
 .../apache/iceberg/connect/data/IcebergWriter.java |  14 +-
 .../iceberg/connect/data/IcebergWriterFactory.java |   7 +-
 ...{WriterResult.java => IcebergWriterResult.java} |   4 +-
 .../apache/iceberg/connect/data/NoOpWriter.java    |   5 +-
 .../data/{RecordWriter.java => Offset.java}        |  35 ++-
 .../data/{Utilities.java => RecordUtils.java}      |  78 +----
 .../apache/iceberg/connect/data/RecordWriter.java  |   4 +-
 .../apache/iceberg/connect/data/SinkWriter.java    | 140 +++++++++
 .../{NoOpWriter.java => SinkWriterResult.java}     |  26 +-
 .../UtilitiesTest.java => CatalogUtilsTest.java}   |  74 +----
 .../iceberg/connect/channel/ChannelTestBase.java   | 131 +++++++++
 .../iceberg/connect/channel/CommitStateTest.java   | 107 +++++++
 .../iceberg/connect/channel/CommitterImplTest.java |  60 ++++
 .../iceberg/connect/channel/CoordinatorTest.java   | 213 ++++++++++++++
 .../connect/channel/CoordinatorThreadTest.java     |  48 ++++
 .../iceberg/connect/channel}/EventTestUtil.java    |   5 +-
 .../apache/iceberg/connect/channel/WorkerTest.java | 116 ++++++++
 .../iceberg/connect/data/BaseWriterTest.java       |   2 +-
 .../iceberg/connect/data/RecordUtilsTest.java      |  93 ++++++
 .../iceberg/connect/data/SchemaUpdateTest.java     |  68 +++++
 .../iceberg/connect/data/SinkWriterTest.java       | 210 ++++++++++++++
 39 files changed, 2628 insertions(+), 261 deletions(-)

diff --git 
a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
 
b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
index 50eaa10504..f30eac8924 100644
--- 
a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
+++ 
b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.connect.events;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -120,4 +121,23 @@ public class TableReference implements IndexedRecord {
         throw new UnsupportedOperationException("Unknown field ordinal: " + i);
     }
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TableReference that = (TableReference) o;
+    return Objects.equals(catalog, that.catalog)
+        && Objects.equals(namespace, that.namespace)
+        && Objects.equals(name, that.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(catalog, namespace, name);
+  }
 }
diff --git 
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
 
b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
index 8f1f7a601f..48e268bf05 100644
--- 
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
+++ 
b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
@@ -44,8 +44,7 @@ class EventTestUtil {
   static final Schema SCHEMA =
       new Schema(ImmutableList.of(Types.NestedField.required(1, "id", 
Types.LongType.get())));
 
-  static final PartitionSpec SPEC =
-      PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
+  static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
 
   static final SortOrder ORDER =
       SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, 
NullOrder.NULLS_FIRST).build();
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
new file mode 100644
index 0000000000..a3c6358e1b
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CatalogUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogUtils.class.getName());
+  private static final List<String> HADOOP_CONF_FILES =
+      ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+  static Catalog loadCatalog(IcebergSinkConfig config) {
+    return CatalogUtil.buildIcebergCatalog(
+        config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+  }
+
+  // use reflection here to avoid requiring Hadoop as a dependency
+  private static Object loadHadoopConfig(IcebergSinkConfig config) {
+    Class<?> configClass =
+        DynClasses.builder()
+            .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
+            .impl("org.apache.hadoop.conf.Configuration")
+            .orNull()
+            .build();
+
+    if (configClass == null) {
+      LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+      return null;
+    }
+
+    try {
+      Object result = 
DynConstructors.builder().hiddenImpl(configClass).build().newInstance();
+      BoundMethod addResourceMethod =
+          DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
+      BoundMethod setMethod =
+          DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
+
+      //  load any config files in the specified config directory
+      String hadoopConfDir = config.hadoopConfDir();
+      if (hadoopConfDir != null) {
+        HADOOP_CONF_FILES.forEach(
+            confFile -> {
+              Path path = Paths.get(hadoopConfDir, confFile);
+              if (Files.exists(path)) {
+                try {
+                  addResourceMethod.invoke(path.toUri().toURL());
+                } catch (IOException e) {
+                  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
+                }
+              }
+            });
+      }
+
+      // set any Hadoop properties specified in the sink config
+      config.hadoopProps().forEach(setMethod::invoke);
+
+      LOG.info("Hadoop config initialized: {}", configClass.getName());
+      return result;
+    } catch (Exception e) {
+      LOG.warn(
+          "Hadoop found on classpath but could not create config, proceeding 
without config", e);
+    }
+    return null;
+  }
+
+  private CatalogUtils() {}
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
similarity index 71%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
index 64ca44f032..edc217d1b0 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
@@ -16,25 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
 
-import java.util.List;
+import java.util.Collection;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
 
-class NoOpWriter implements RecordWriter {
-  @Override
-  public void write(SinkRecord record) {
-    // NO-OP
-  }
+public interface Committer {
+  void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext 
context);
 
-  @Override
-  public List<WriterResult> complete() {
-    // NO-OP
-    return null;
-  }
+  void stop();
 
-  @Override
-  public void close() {
-    // NO-OP
-  }
+  void save(Collection<SinkRecord> sinkRecords);
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
similarity index 76%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
index 0b4d7566ea..18ff118c77 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
@@ -16,16 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
 
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.iceberg.connect.channel.CommitterImpl;
 
-public interface RecordWriter extends Cloneable {
+class CommitterFactory {
+  static Committer createCommitter(IcebergSinkConfig config) {
+    return new CommitterImpl();
+  }
 
-  void write(SinkRecord record);
-
-  List<WriterResult> complete();
-
-  void close();
+  private CommitterFactory() {}
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index e64e183089..aed11ab0b1 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -80,7 +80,6 @@ public class IcebergSinkConfig extends AbstractConfig {
   private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
       "iceberg.tables.schema-case-insensitive";
   private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
-  private static final String CONTROL_GROUP_ID_PROP = 
"iceberg.control.group-id";
   private static final String COMMIT_INTERVAL_MS_PROP = 
"iceberg.control.commit.interval-ms";
   private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
   private static final String COMMIT_TIMEOUT_MS_PROP = 
"iceberg.control.commit.timeout-ms";
@@ -104,11 +103,7 @@ public class IcebergSinkConfig extends AbstractConfig {
   public static final ConfigDef CONFIG_DEF = newConfigDef();
 
   public static String version() {
-    String kcVersion = 
IcebergSinkConfig.class.getPackage().getImplementationVersion();
-    if (kcVersion == null) {
-      kcVersion = "unknown";
-    }
-    return IcebergBuild.version() + "-kc-" + kcVersion;
+    return IcebergBuild.version();
   }
 
   private static ConfigDef newConfigDef() {
@@ -185,12 +180,6 @@ public class IcebergSinkConfig extends AbstractConfig {
         DEFAULT_CONTROL_TOPIC,
         Importance.MEDIUM,
         "Name of the control topic");
-    configDef.define(
-        CONTROL_GROUP_ID_PROP,
-        ConfigDef.Type.STRING,
-        null,
-        Importance.MEDIUM,
-        "Name of the consumer group to store offsets");
     configDef.define(
         CONNECT_GROUP_ID_PROP,
         ConfigDef.Type.STRING,
@@ -370,16 +359,6 @@ public class IcebergSinkConfig extends AbstractConfig {
     return getString(CONTROL_TOPIC_PROP);
   }
 
-  public String controlGroupId() {
-    String result = getString(CONTROL_GROUP_ID_PROP);
-    if (result != null) {
-      return result;
-    }
-    String connectorName = connectorName();
-    Preconditions.checkNotNull(connectorName, "Connector name cannot be null");
-    return DEFAULT_CONTROL_GROUP_PREFIX + connectorName;
-  }
-
   public String connectGroupId() {
     String result = getString(CONNECT_GROUP_ID_PROP);
     if (result != null) {
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
index 8be8518f44..be1f9a50b8 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
@@ -44,9 +44,7 @@ public class IcebergSinkConnector extends SinkConnector {
 
   @Override
   public Class<? extends Task> taskClass() {
-    // FIXME: update this when the connector channel is added
-    //  return IcebergSinkTask.class;
-    return null;
+    return IcebergSinkTask.class;
   }
 
   @Override
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
new file mode 100644
index 0000000000..460b18fd7f
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkTask extends SinkTask {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSinkTask.class);
+
+  private IcebergSinkConfig config;
+  private Catalog catalog;
+  private Committer committer;
+
+  @Override
+  public String version() {
+    return IcebergSinkConfig.version();
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    this.config = new IcebergSinkConfig(props);
+  }
+
+  @Override
+  public void open(Collection<TopicPartition> partitions) {
+    Preconditions.checkArgument(catalog == null, "Catalog already open");
+    Preconditions.checkArgument(committer == null, "Committer already open");
+
+    catalog = CatalogUtils.loadCatalog(config);
+    committer = CommitterFactory.createCommitter(config);
+    committer.start(catalog, config, context);
+  }
+
+  @Override
+  public void close(Collection<TopicPartition> partitions) {
+    close();
+  }
+
+  private void close() {
+    if (committer != null) {
+      committer.stop();
+      committer = null;
+    }
+
+    if (catalog != null) {
+      if (catalog instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) catalog).close();
+        } catch (Exception e) {
+          LOG.warn("An error occurred closing catalog instance, ignoring...", 
e);
+        }
+      }
+      catalog = null;
+    }
+  }
+
+  @Override
+  public void put(Collection<SinkRecord> sinkRecords) {
+    Preconditions.checkNotNull(committer, "Committer wasn't initialized");
+    committer.save(sinkRecords);
+  }
+
+  @Override
+  public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    Preconditions.checkNotNull(committer, "Committer wasn't initialized");
+    committer.save(null);
+  }
+
+  @Override
+  public Map<TopicPartition, OffsetAndMetadata> preCommit(
+      Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    // offset commit is handled by the worker
+    return ImmutableMap.of();
+  }
+
+  @Override
+  public void stop() {
+    close();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
new file mode 100644
index 0000000000..993fcf67c9
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class Channel {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
+
+  private final String controlTopic;
+  private final String connectGroupId;
+  private final Producer<String, byte[]> producer;
+  private final Consumer<String, byte[]> consumer;
+  private final SinkTaskContext context;
+  private final Admin admin;
+  private final Map<Integer, Long> controlTopicOffsets = Maps.newHashMap();
+  private final String producerId;
+
+  Channel(
+      String name,
+      String consumerGroupId,
+      IcebergSinkConfig config,
+      KafkaClientFactory clientFactory,
+      SinkTaskContext context) {
+    this.controlTopic = config.controlTopic();
+    this.connectGroupId = config.connectGroupId();
+    this.context = context;
+
+    String transactionalId = name + config.transactionalSuffix();
+    this.producer = clientFactory.createProducer(transactionalId);
+    this.consumer = clientFactory.createConsumer(consumerGroupId);
+    this.admin = clientFactory.createAdmin();
+
+    this.producerId = UUID.randomUUID().toString();
+  }
+
+  protected void send(Event event) {
+    send(ImmutableList.of(event), ImmutableMap.of());
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  protected void send(List<Event> events, Map<TopicPartition, Offset> 
sourceOffsets) {
+    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Maps.newHashMap();
+    sourceOffsets.forEach((k, v) -> offsetsToCommit.put(k, new 
OffsetAndMetadata(v.offset())));
+
+    List<ProducerRecord<String, byte[]>> recordList =
+        events.stream()
+            .map(
+                event -> {
+                  LOG.info("Sending event of type: {}", event.type().name());
+                  byte[] data = AvroUtil.encode(event);
+                  // key by producer ID to keep event order
+                  return new ProducerRecord<>(controlTopic, producerId, data);
+                })
+            .collect(Collectors.toList());
+
+    synchronized (producer) {
+      producer.beginTransaction();
+      try {
+        // NOTE: we shouldn't call get() on the future in a transactional 
context,
+        // see docs for org.apache.kafka.clients.producer.KafkaProducer
+        recordList.forEach(producer::send);
+        if (!sourceOffsets.isEmpty()) {
+          producer.sendOffsetsToTransaction(
+              offsetsToCommit, KafkaUtils.consumerGroupMetadata(context));
+        }
+        producer.commitTransaction();
+      } catch (Exception e) {
+        try {
+          producer.abortTransaction();
+        } catch (Exception ex) {
+          LOG.warn("Error aborting producer transaction", ex);
+        }
+        throw e;
+      }
+    }
+  }
+
+  protected abstract boolean receive(Envelope envelope);
+
+  protected void consumeAvailable(Duration pollDuration) {
+    ConsumerRecords<String, byte[]> records = consumer.poll(pollDuration);
+    while (!records.isEmpty()) {
+      records.forEach(
+          record -> {
+            // the consumer stores the offsets that corresponds to the next 
record to consume,
+            // so increment the record offset by one
+            controlTopicOffsets.put(record.partition(), record.offset() + 1);
+
+            Event event = AvroUtil.decode(record.value());
+
+            if (event.groupId().equals(connectGroupId)) {
+              LOG.debug("Received event of type: {}", event.type().name());
+              if (receive(new Envelope(event, record.partition(), 
record.offset()))) {
+                LOG.info("Handled event of type: {}", event.type().name());
+              }
+            }
+          });
+      records = consumer.poll(pollDuration);
+    }
+  }
+
+  protected Map<Integer, Long> controlTopicOffsets() {
+    return controlTopicOffsets;
+  }
+
+  protected void commitConsumerOffsets() {
+    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Maps.newHashMap();
+    controlTopicOffsets()
+        .forEach(
+            (k, v) ->
+                offsetsToCommit.put(new TopicPartition(controlTopic, k), new 
OffsetAndMetadata(v)));
+    consumer.commitSync(offsetsToCommit);
+  }
+
+  void start() {
+    consumer.subscribe(ImmutableList.of(controlTopic));
+
+    // initial poll with longer duration so the consumer will initialize...
+    consumeAvailable(Duration.ofSeconds(1));
+  }
+
+  void stop() {
+    LOG.info("Channel stopping");
+    producer.close();
+    consumer.close();
+    admin.close();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
new file mode 100644
index 0000000000..6cad33c3e3
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.OffsetDateTime;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CommitState {
+  private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);
+
+  private final List<Envelope> commitBuffer = Lists.newArrayList();
+  private final List<DataComplete> readyBuffer = Lists.newArrayList();
+  private long startTime;
+  private UUID currentCommitId;
+  private final IcebergSinkConfig config;
+
+  CommitState(IcebergSinkConfig config) {
+    this.config = config;
+  }
+
+  void addResponse(Envelope envelope) {
+    commitBuffer.add(envelope);
+    if (!isCommitInProgress()) {
+      DataWritten dataWritten = (DataWritten) envelope.event().payload();
+      LOG.warn(
+          "Received commit response when no commit in progress, this can 
happen during recovery. Commit ID: {}",
+          dataWritten.commitId());
+    }
+  }
+
+  void addReady(Envelope envelope) {
+    DataComplete dataComplete = (DataComplete) envelope.event().payload();
+    readyBuffer.add(dataComplete);
+    if (!isCommitInProgress()) {
+      LOG.warn(
+          "Received commit ready when no commit in progress, this can happen 
during recovery. Commit ID: {}",
+          dataComplete.commitId());
+    }
+  }
+
+  UUID currentCommitId() {
+    return currentCommitId;
+  }
+
+  boolean isCommitInProgress() {
+    return currentCommitId != null;
+  }
+
+  boolean isCommitIntervalReached() {
+    if (startTime == 0) {
+      startTime = System.currentTimeMillis();
+    }
+
+    return (!isCommitInProgress()
+        && System.currentTimeMillis() - startTime >= 
config.commitIntervalMs());
+  }
+
+  void startNewCommit() {
+    currentCommitId = UUID.randomUUID();
+    startTime = System.currentTimeMillis();
+  }
+
+  void endCurrentCommit() {
+    readyBuffer.clear();
+    currentCommitId = null;
+  }
+
+  void clearResponses() {
+    commitBuffer.clear();
+  }
+
+  boolean isCommitTimedOut() {
+    if (!isCommitInProgress()) {
+      return false;
+    }
+
+    if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
+      LOG.info("Commit timeout reached. Commit ID: {}", currentCommitId);
+      return true;
+    }
+    return false;
+  }
+
+  boolean isCommitReady(int expectedPartitionCount) {
+    if (!isCommitInProgress()) {
+      return false;
+    }
+
+    int receivedPartitionCount =
+        readyBuffer.stream()
+            .filter(payload -> payload.commitId().equals(currentCommitId))
+            .mapToInt(payload -> payload.assignments().size())
+            .sum();
+
+    if (receivedPartitionCount >= expectedPartitionCount) {
+      LOG.info(
+          "Commit {} ready, received responses for all {} partitions",
+          currentCommitId,
+          receivedPartitionCount);
+      return true;
+    }
+
+    LOG.info(
+        "Commit {} not ready, received responses for {} of {} partitions, 
waiting for more",
+        currentCommitId,
+        receivedPartitionCount,
+        expectedPartitionCount);
+
+    return false;
+  }
+
+  Map<TableReference, List<Envelope>> tableCommitMap() {
+    return commitBuffer.stream()
+        .collect(
+            Collectors.groupingBy(
+                envelope -> ((DataWritten) 
envelope.event().payload()).tableReference()));
+  }
+
+  OffsetDateTime validThroughTs(boolean partialCommit) {
+    boolean hasValidThroughTs =
+        !partialCommit
+            && readyBuffer.stream()
+                .flatMap(event -> event.assignments().stream())
+                .allMatch(offset -> offset.timestamp() != null);
+
+    OffsetDateTime result;
+    if (hasValidThroughTs) {
+      result =
+          readyBuffer.stream()
+              .flatMap(event -> event.assignments().stream())
+              .map(TopicPartitionOffset::timestamp)
+              .min(Comparator.naturalOrder())
+              .orElse(null);
+    } else {
+      result = null;
+    }
+    return result;
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
new file mode 100644
index 0000000000..53b7b76e8e
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.Committer;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.SinkWriter;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitterImpl implements Committer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CommitterImpl.class);
+
+  private CoordinatorThread coordinatorThread;
+  private Worker worker;
+
+  static class TopicPartitionComparator implements Comparator<TopicPartition> {
+
+    @Override
+    public int compare(TopicPartition o1, TopicPartition o2) {
+      int result = o1.topic().compareTo(o2.topic());
+      if (result == 0) {
+        result = Integer.compare(o1.partition(), o2.partition());
+      }
+      return result;
+    }
+  }
+
+  @Override
+  public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext 
context) {
+    KafkaClientFactory clientFactory = new 
KafkaClientFactory(config.kafkaProps());
+
+    ConsumerGroupDescription groupDesc;
+    try (Admin admin = clientFactory.createAdmin()) {
+      groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), 
admin);
+    }
+
+    if (groupDesc.state() == ConsumerGroupState.STABLE) {
+      Collection<MemberDescription> members = groupDesc.members();
+      Set<TopicPartition> partitions = context.assignment();
+      if (isLeader(members, partitions)) {
+        LOG.info("Task elected leader, starting commit coordinator");
+        Coordinator coordinator = new Coordinator(catalog, config, members, 
clientFactory, context);
+        coordinatorThread = new CoordinatorThread(coordinator);
+        coordinatorThread.start();
+      }
+    }
+
+    LOG.info("Starting commit worker");
+    SinkWriter sinkWriter = new SinkWriter(catalog, config);
+    worker = new Worker(config, clientFactory, sinkWriter, context);
+    worker.start();
+  }
+
+  @Override
+  public void save(Collection<SinkRecord> sinkRecords) {
+    if (sinkRecords != null && !sinkRecords.isEmpty()) {
+      worker.save(sinkRecords);
+    }
+    processControlEvents();
+  }
+
+  @Override
+  public void stop() {
+    if (worker != null) {
+      worker.stop();
+      worker = null;
+    }
+
+    if (coordinatorThread != null) {
+      coordinatorThread.terminate();
+      coordinatorThread = null;
+    }
+  }
+
+  @VisibleForTesting
+  boolean isLeader(Collection<MemberDescription> members, 
Collection<TopicPartition> partitions) {
+    // there should only be one task assigned partition 0 of the first topic,
+    // so elect that one the leader
+    TopicPartition firstTopicPartition =
+        members.stream()
+            .flatMap(member -> member.assignment().topicPartitions().stream())
+            .min(new TopicPartitionComparator())
+            .orElseThrow(
+                () -> new ConnectException("No partitions assigned, cannot 
determine leader"));
+
+    return partitions.contains(firstTopicPartition);
+  }
+
+  private void processControlEvents() {
+    if (coordinatorThread != null && coordinatorThread.isTerminated()) {
+      throw new NotRunningException("Coordinator unexpectedly terminated");
+    }
+    if (worker != null) {
+      worker.process();
+    }
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
new file mode 100644
index 0000000000..7274f77e0c
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.CommitComplete;
+import org.apache.iceberg.connect.events.CommitToTable;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class Coordinator extends Channel {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final String COMMIT_ID_SNAPSHOT_PROP = 
"kafka.connect.commit-id";
+  private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = 
"kafka.connect.valid-through-ts";
+  private static final Duration POLL_DURATION = Duration.ofSeconds(1);
+
+  private final Catalog catalog;
+  private final IcebergSinkConfig config;
+  private final int totalPartitionCount;
+  private final String snapshotOffsetsProp;
+  private final ExecutorService exec;
+  private final CommitState commitState;
+
+  Coordinator(
+      Catalog catalog,
+      IcebergSinkConfig config,
+      Collection<MemberDescription> members,
+      KafkaClientFactory clientFactory,
+      SinkTaskContext context) {
+    // pass consumer group ID to which we commit low watermark offsets
+    super("coordinator", config.connectGroupId() + "-coord", config, 
clientFactory, context);
+
+    this.catalog = catalog;
+    this.config = config;
+    this.totalPartitionCount =
+        members.stream().mapToInt(desc -> 
desc.assignment().topicPartitions().size()).sum();
+    this.snapshotOffsetsProp =
+        String.format(
+            "kafka.connect.offsets.%s.%s", config.controlTopic(), 
config.connectGroupId());
+    this.exec = ThreadPools.newWorkerPool("iceberg-committer", 
config.commitThreads());
+    this.commitState = new CommitState(config);
+  }
+
+  void process() {
+    if (commitState.isCommitIntervalReached()) {
+      // send out begin commit
+      commitState.startNewCommit();
+      Event event =
+          new Event(config.connectGroupId(), new 
StartCommit(commitState.currentCommitId()));
+      send(event);
+      LOG.info("Commit {} initiated", commitState.currentCommitId());
+    }
+
+    consumeAvailable(POLL_DURATION);
+
+    if (commitState.isCommitTimedOut()) {
+      commit(true);
+    }
+  }
+
+  @Override
+  protected boolean receive(Envelope envelope) {
+    switch (envelope.event().payload().type()) {
+      case DATA_WRITTEN:
+        commitState.addResponse(envelope);
+        return true;
+      case DATA_COMPLETE:
+        commitState.addReady(envelope);
+        if (commitState.isCommitReady(totalPartitionCount)) {
+          commit(false);
+        }
+        return true;
+    }
+    return false;
+  }
+
+  private void commit(boolean partialCommit) {
+    try {
+      doCommit(partialCommit);
+    } catch (Exception e) {
+      LOG.warn("Commit failed, will try again next cycle", e);
+    } finally {
+      commitState.endCurrentCommit();
+    }
+  }
+
+  private void doCommit(boolean partialCommit) {
+    Map<TableReference, List<Envelope>> commitMap = 
commitState.tableCommitMap();
+
+    String offsetsJson = offsetsJson();
+    OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit);
+
+    Tasks.foreach(commitMap.entrySet())
+        .executeWith(exec)
+        .stopOnFailure()
+        .run(
+            entry -> {
+              commitToTable(entry.getKey(), entry.getValue(), offsetsJson, 
validThroughTs);
+            });
+
+    // we should only get here if all tables committed successfully...
+    commitConsumerOffsets();
+    commitState.clearResponses();
+
+    Event event =
+        new Event(
+            config.connectGroupId(),
+            new CommitComplete(commitState.currentCommitId(), validThroughTs));
+    send(event);
+
+    LOG.info(
+        "Commit {} complete, committed to {} table(s), valid-through {}",
+        commitState.currentCommitId(),
+        commitMap.size(),
+        validThroughTs);
+  }
+
+  private String offsetsJson() {
+    try {
+      return MAPPER.writeValueAsString(controlTopicOffsets());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private void commitToTable(
+      TableReference tableReference,
+      List<Envelope> envelopeList,
+      String offsetsJson,
+      OffsetDateTime validThroughTs) {
+    TableIdentifier tableIdentifier = tableReference.identifier();
+    Table table;
+    try {
+      table = catalog.loadTable(tableIdentifier);
+    } catch (NoSuchTableException e) {
+      LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e);
+      return;
+    }
+
+    String branch = 
config.tableConfig(tableIdentifier.toString()).commitBranch();
+
+    Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, 
branch);
+
+    List<DataWritten> payloads =
+        envelopeList.stream()
+            .filter(
+                envelope -> {
+                  Long minOffset = committedOffsets.get(envelope.partition());
+                  return minOffset == null || envelope.offset() >= minOffset;
+                })
+            .map(envelope -> (DataWritten) envelope.event().payload())
+            .collect(Collectors.toList());
+
+    List<DataFile> dataFiles =
+        payloads.stream()
+            .filter(payload -> payload.dataFiles() != null)
+            .flatMap(payload -> payload.dataFiles().stream())
+            .filter(dataFile -> dataFile.recordCount() > 0)
+            .filter(distinctByKey(dataFile -> dataFile.path().toString()))
+            .collect(Collectors.toList());
+
+    List<DeleteFile> deleteFiles =
+        payloads.stream()
+            .filter(payload -> payload.deleteFiles() != null)
+            .flatMap(payload -> payload.deleteFiles().stream())
+            .filter(deleteFile -> deleteFile.recordCount() > 0)
+            .filter(distinctByKey(deleteFile -> deleteFile.path().toString()))
+            .collect(Collectors.toList());
+
+    if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
+      LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
+    } else {
+      if (deleteFiles.isEmpty()) {
+        AppendFiles appendOp = table.newAppend();
+        if (branch != null) {
+          appendOp.toBranch(branch);
+        }
+        appendOp.set(snapshotOffsetsProp, offsetsJson);
+        appendOp.set(COMMIT_ID_SNAPSHOT_PROP, 
commitState.currentCommitId().toString());
+        if (validThroughTs != null) {
+          appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, 
validThroughTs.toString());
+        }
+        dataFiles.forEach(appendOp::appendFile);
+        appendOp.commit();
+      } else {
+        RowDelta deltaOp = table.newRowDelta();
+        if (branch != null) {
+          deltaOp.toBranch(branch);
+        }
+        deltaOp.set(snapshotOffsetsProp, offsetsJson);
+        deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, 
commitState.currentCommitId().toString());
+        if (validThroughTs != null) {
+          deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, 
validThroughTs.toString());
+        }
+        dataFiles.forEach(deltaOp::addRows);
+        deleteFiles.forEach(deltaOp::addDeletes);
+        deltaOp.commit();
+      }
+
+      Long snapshotId = latestSnapshot(table, branch).snapshotId();
+      Event event =
+          new Event(
+              config.connectGroupId(),
+              new CommitToTable(
+                  commitState.currentCommitId(), tableReference, snapshotId, 
validThroughTs));
+      send(event);
+
+      LOG.info(
+          "Commit complete to table {}, snapshot {}, commit ID {}, 
valid-through {}",
+          tableIdentifier,
+          snapshotId,
+          commitState.currentCommitId(),
+          validThroughTs);
+    }
+  }
+
+  private <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
+    Map<Object, Boolean> seen = Maps.newConcurrentMap();
+    return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
+  }
+
+  private Snapshot latestSnapshot(Table table, String branch) {
+    if (branch == null) {
+      return table.currentSnapshot();
+    }
+    return table.snapshot(branch);
+  }
+
+  private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String 
branch) {
+    Snapshot snapshot = latestSnapshot(table, branch);
+    while (snapshot != null) {
+      Map<String, String> summary = snapshot.summary();
+      String value = summary.get(snapshotOffsetsProp);
+      if (value != null) {
+        TypeReference<Map<Integer, Long>> typeRef = new 
TypeReference<Map<Integer, Long>>() {};
+        try {
+          return MAPPER.readValue(value, typeRef);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      Long parentSnapshotId = snapshot.parentId();
+      snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : 
null;
+    }
+    return ImmutableMap.of();
+  }
+
+  @Override
+  void stop() {
+    exec.shutdownNow();
+
+    // ensure coordinator tasks are shut down, else cause the sink worker to 
fail
+    try {
+      if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
+        throw new RuntimeException("Timed out waiting for coordinator 
shutdown");
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted while waiting for coordinator 
shutdown", e);
+    }
+
+    super.stop();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
new file mode 100644
index 0000000000..6a31b17fc6
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CoordinatorThread extends Thread {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorThread.class);
+  private static final String THREAD_NAME = "iceberg-coord";
+
+  private Coordinator coordinator;
+  private volatile boolean terminated;
+
+  CoordinatorThread(Coordinator coordinator) {
+    super(THREAD_NAME);
+    this.coordinator = coordinator;
+  }
+
+  @Override
+  public void run() {
+    try {
+      coordinator.start();
+    } catch (Exception e) {
+      LOG.error("Coordinator error during start, exiting thread", e);
+      terminated = true;
+    }
+
+    while (!terminated) {
+      try {
+        coordinator.process();
+      } catch (Exception e) {
+        LOG.error("Coordinator error during process, exiting thread", e);
+        terminated = true;
+      }
+    }
+
+    try {
+      coordinator.stop();
+    } catch (Exception e) {
+      LOG.error("Coordinator error during stop, ignoring", e);
+    }
+    coordinator = null;
+  }
+
+  boolean isTerminated() {
+    return terminated;
+  }
+
+  void terminate() {
+    terminated = true;
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
similarity index 64%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
index 64ca44f032..87a93d0585 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
@@ -16,25 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect.channel;
 
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.iceberg.connect.events.Event;
 
-class NoOpWriter implements RecordWriter {
-  @Override
-  public void write(SinkRecord record) {
-    // NO-OP
+class Envelope {
+  private final Event event;
+  private final int partition;
+  private final long offset;
+
+  Envelope(Event event, int partition, long offset) {
+    this.event = event;
+    this.partition = partition;
+    this.offset = offset;
+  }
+
+  Event event() {
+    return event;
   }
 
-  @Override
-  public List<WriterResult> complete() {
-    // NO-OP
-    return null;
+  int partition() {
+    return partition;
   }
 
-  @Override
-  public void close() {
-    // NO-OP
+  long offset() {
+    return offset;
   }
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
new file mode 100644
index 0000000000..fd5d27ae34
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+class KafkaClientFactory {
+  private final Map<String, String> kafkaProps;
+
+  KafkaClientFactory(Map<String, String> kafkaProps) {
+    this.kafkaProps = kafkaProps;
+  }
+
+  Producer<String, byte[]> createProducer(String transactionalId) {
+    Map<String, Object> producerProps = Maps.newHashMap(kafkaProps);
+    producerProps.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
+    producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+    KafkaProducer<String, byte[]> result =
+        new KafkaProducer<>(producerProps, new StringSerializer(), new 
ByteArraySerializer());
+    result.initTransactions();
+    return result;
+  }
+
+  Consumer<String, byte[]> createConsumer(String consumerGroupId) {
+    Map<String, Object> consumerProps = Maps.newHashMap(kafkaProps);
+    consumerProps.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest");
+    consumerProps.putIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
+    return new KafkaConsumer<>(
+        consumerProps, new StringDeserializer(), new ByteArrayDeserializer());
+  }
+
+  Admin createAdmin() {
+    Map<String, Object> adminProps = Maps.newHashMap(kafkaProps);
+    return Admin.create(adminProps);
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
new file mode 100644
index 0000000000..be51fff8bf
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+class KafkaUtils {
+
+  private static final String CONTEXT_CLASS_NAME =
+      "org.apache.kafka.connect.runtime.WorkerSinkTaskContext";
+
+  static ConsumerGroupDescription consumerGroupDescription(String 
consumerGroupId, Admin admin) {
+    try {
+      DescribeConsumerGroupsResult result =
+          admin.describeConsumerGroups(ImmutableList.of(consumerGroupId));
+      return result.describedGroups().get(consumerGroupId).get();
+
+    } catch (InterruptedException | ExecutionException e) {
+      throw new ConnectException(
+          "Cannot retrieve members for consumer group: " + consumerGroupId, e);
+    }
+  }
+
+  static ConsumerGroupMetadata consumerGroupMetadata(SinkTaskContext context) {
+    return kafkaConsumer(context).groupMetadata();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Consumer<byte[], byte[]> kafkaConsumer(SinkTaskContext 
context) {
+    String contextClassName = context.getClass().getName();
+    try {
+      return ((Consumer<byte[], byte[]>)
+          DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, 
"consumer").build(context).get());
+    } catch (Exception e) {
+      throw new ConnectException(
+          "Unable to retrieve consumer from context: " + contextClassName, e);
+    }
+  }
+
+  private KafkaUtils() {}
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
similarity index 76%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
index 0b4d7566ea..72a362ceac 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
@@ -16,16 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect.channel;
 
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
-
-public interface RecordWriter extends Cloneable {
-
-  void write(SinkRecord record);
-
-  List<WriterResult> complete();
-
-  void close();
+public class NotRunningException extends RuntimeException {
+  public NotRunningException(String msg) {
+    super(msg);
+  }
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
new file mode 100644
index 0000000000..7555b216cd
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.data.SinkWriter;
+import org.apache.iceberg.connect.data.SinkWriterResult;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+class Worker extends Channel {
+
+  private final IcebergSinkConfig config;
+  private final SinkTaskContext context;
+  private final SinkWriter sinkWriter;
+
+  Worker(
+      IcebergSinkConfig config,
+      KafkaClientFactory clientFactory,
+      SinkWriter sinkWriter,
+      SinkTaskContext context) {
+    // pass transient consumer group ID to which we never commit offsets
+    super(
+        "worker",
+        IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
+        config,
+        clientFactory,
+        context);
+
+    this.config = config;
+    this.context = context;
+    this.sinkWriter = sinkWriter;
+  }
+
+  void process() {
+    consumeAvailable(Duration.ZERO);
+  }
+
+  @Override
+  protected boolean receive(Envelope envelope) {
+    Event event = envelope.event();
+    if (event.payload().type() != PayloadType.START_COMMIT) {
+      return false;
+    }
+
+    SinkWriterResult results = sinkWriter.completeWrite();
+
+    // include all assigned topic partitions even if no messages were read
+    // from a partition, as the coordinator will use that to determine
+    // when all data for a commit has been received
+    List<TopicPartitionOffset> assignments =
+        context.assignment().stream()
+            .map(
+                tp -> {
+                  Offset offset = results.sourceOffsets().get(tp);
+                  if (offset == null) {
+                    offset = Offset.NULL_OFFSET;
+                  }
+                  return new TopicPartitionOffset(
+                      tp.topic(), tp.partition(), offset.offset(), 
offset.timestamp());
+                })
+            .collect(Collectors.toList());
+
+    UUID commitId = ((StartCommit) event.payload()).commitId();
+
+    List<Event> events =
+        results.writerResults().stream()
+            .map(
+                writeResult ->
+                    new Event(
+                        config.connectGroupId(),
+                        new DataWritten(
+                            writeResult.partitionStruct(),
+                            commitId,
+                            TableReference.of(config.catalogName(), 
writeResult.tableIdentifier()),
+                            writeResult.dataFiles(),
+                            writeResult.deleteFiles())))
+            .collect(Collectors.toList());
+
+    Event readyEvent = new Event(config.connectGroupId(), new 
DataComplete(commitId, assignments));
+    events.add(readyEvent);
+
+    send(events, results.sourceOffsets());
+
+    return true;
+  }
+
+  @Override
+  void stop() {
+    super.stop();
+    sinkWriter.close();
+  }
+
+  void save(Collection<SinkRecord> sinkRecords) {
+    sinkWriter.save(sinkRecords);
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
index 27ffc4de99..6df6b09151 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
@@ -32,16 +32,16 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
 
-public class IcebergWriter implements RecordWriter {
+class IcebergWriter implements RecordWriter {
   private final Table table;
   private final String tableName;
   private final IcebergSinkConfig config;
-  private final List<WriterResult> writerResults;
+  private final List<IcebergWriterResult> writerResults;
 
   private RecordConverter recordConverter;
   private TaskWriter<Record> writer;
 
-  public IcebergWriter(Table table, String tableName, IcebergSinkConfig 
config) {
+  IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
     this.table = table;
     this.tableName = tableName;
     this.config = config;
@@ -50,7 +50,7 @@ public class IcebergWriter implements RecordWriter {
   }
 
   private void initNewWriter() {
-    this.writer = Utilities.createTableWriter(table, tableName, config);
+    this.writer = RecordUtils.createTableWriter(table, tableName, config);
     this.recordConverter = new RecordConverter(table, config);
   }
 
@@ -102,7 +102,7 @@ public class IcebergWriter implements RecordWriter {
     }
 
     writerResults.add(
-        new WriterResult(
+        new IcebergWriterResult(
             TableIdentifier.parse(tableName),
             Arrays.asList(writeResult.dataFiles()),
             Arrays.asList(writeResult.deleteFiles()),
@@ -110,10 +110,10 @@ public class IcebergWriter implements RecordWriter {
   }
 
   @Override
-  public List<WriterResult> complete() {
+  public List<IcebergWriterResult> complete() {
     flush();
 
-    List<WriterResult> result = Lists.newArrayList(writerResults);
+    List<IcebergWriterResult> result = Lists.newArrayList(writerResults);
     writerResults.clear();
 
     return result;
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
index 47dcddcb99..92f5af2d7a 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
@@ -40,20 +40,19 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class IcebergWriterFactory {
+class IcebergWriterFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergWriterFactory.class);
 
   private final Catalog catalog;
   private final IcebergSinkConfig config;
 
-  public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
+  IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
     this.catalog = catalog;
     this.config = config;
   }
 
-  public RecordWriter createWriter(
-      String tableName, SinkRecord sample, boolean ignoreMissingTable) {
+  RecordWriter createWriter(String tableName, SinkRecord sample, boolean 
ignoreMissingTable) {
     TableIdentifier identifier = TableIdentifier.parse(tableName);
     Table table;
     try {
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
similarity index 96%
rename from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
rename to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
index cb3a700da2..58695a5572 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
@@ -24,14 +24,14 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types.StructType;
 
-public class WriterResult {
+public class IcebergWriterResult {
 
   private final TableIdentifier tableIdentifier;
   private final List<DataFile> dataFiles;
   private final List<DeleteFile> deleteFiles;
   private final StructType partitionStruct;
 
-  public WriterResult(
+  public IcebergWriterResult(
       TableIdentifier tableIdentifier,
       List<DataFile> dataFiles,
       List<DeleteFile> deleteFiles,
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
index 64ca44f032..a7d2c90972 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.connect.data;
 
 import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.kafka.connect.sink.SinkRecord;
 
 class NoOpWriter implements RecordWriter {
@@ -28,9 +29,9 @@ class NoOpWriter implements RecordWriter {
   }
 
   @Override
-  public List<WriterResult> complete() {
+  public List<IcebergWriterResult> complete() {
     // NO-OP
-    return null;
+    return ImmutableList.of();
   }
 
   @Override
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
similarity index 55%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
index 0b4d7566ea..c4522a4071 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
@@ -18,14 +18,37 @@
  */
 package org.apache.iceberg.connect.data;
 
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import java.time.OffsetDateTime;
+import java.util.Objects;
 
-public interface RecordWriter extends Cloneable {
+public class Offset implements Comparable<Offset> {
 
-  void write(SinkRecord record);
+  public static final Offset NULL_OFFSET = new Offset(null, null);
 
-  List<WriterResult> complete();
+  private final Long offset;
+  private final OffsetDateTime timestamp;
 
-  void close();
+  public Offset(Long offset, OffsetDateTime timestamp) {
+    this.offset = offset;
+    this.timestamp = timestamp;
+  }
+
+  public Long offset() {
+    return offset;
+  }
+
+  public OffsetDateTime timestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public int compareTo(Offset other) {
+    if (Objects.equals(this.offset, other.offset)) {
+      return 0;
+    }
+    if (this.offset == null || (other.offset != null && other.offset > 
this.offset)) {
+      return -1;
+    }
+    return 1;
+  }
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
similarity index 68%
rename from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
rename to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
index 4ff83f7775..5ac9307397 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
@@ -18,25 +18,14 @@
  */
 package org.apache.iceberg.connect.data;
 
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.common.DynMethods.BoundMethod;
 import org.apache.iceberg.connect.IcebergSinkConfig;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
@@ -46,7 +35,6 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -55,71 +43,11 @@ import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Struct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class Utilities {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(Utilities.class.getName());
-  private static final List<String> HADOOP_CONF_FILES =
-      ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
-
-  public static Catalog loadCatalog(IcebergSinkConfig config) {
-    return CatalogUtil.buildIcebergCatalog(
-        config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
-  }
-
-  // use reflection here to avoid requiring Hadoop as a dependency
-  private static Object loadHadoopConfig(IcebergSinkConfig config) {
-    Class<?> configClass =
-        DynClasses.builder()
-            .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
-            .impl("org.apache.hadoop.conf.Configuration")
-            .orNull()
-            .build();
-
-    if (configClass == null) {
-      LOG.info("Hadoop not found on classpath, not creating Hadoop config");
-      return null;
-    }
-
-    try {
-      Object result = 
DynConstructors.builder().hiddenImpl(configClass).build().newInstance();
-      BoundMethod addResourceMethod =
-          DynMethods.builder("addResource").impl(configClass, 
URL.class).build(result);
-      BoundMethod setMethod =
-          DynMethods.builder("set").impl(configClass, String.class, 
String.class).build(result);
-
-      //  load any config files in the specified config directory
-      String hadoopConfDir = config.hadoopConfDir();
-      if (hadoopConfDir != null) {
-        HADOOP_CONF_FILES.forEach(
-            confFile -> {
-              Path path = Paths.get(hadoopConfDir, confFile);
-              if (Files.exists(path)) {
-                try {
-                  addResourceMethod.invoke(path.toUri().toURL());
-                } catch (IOException e) {
-                  LOG.warn("Error adding Hadoop resource {}, resource was not 
added", path, e);
-                }
-              }
-            });
-      }
-
-      // set any Hadoop properties specified in the sink config
-      config.hadoopProps().forEach(setMethod::invoke);
-
-      LOG.info("Hadoop config initialized: {}", configClass.getName());
-      return result;
-    } catch (Exception e) {
-      LOG.warn(
-          "Hadoop found on classpath but could not create config, proceeding 
without config", e);
-    }
-    return null;
-  }
+class RecordUtils {
 
   @SuppressWarnings("unchecked")
-  public static Object extractFromRecordValue(Object recordValue, String 
fieldName) {
+  static Object extractFromRecordValue(Object recordValue, String fieldName) {
     List<String> fields = Splitter.on('.').splitToList(fieldName);
     if (recordValue instanceof Struct) {
       return valueFromStruct((Struct) recordValue, fields);
@@ -243,5 +171,5 @@ public class Utilities {
     return writer;
   }
 
-  private Utilities() {}
+  private RecordUtils() {}
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
index 0b4d7566ea..56438dde2e 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
@@ -21,11 +21,11 @@ package org.apache.iceberg.connect.data;
 import java.util.List;
 import org.apache.kafka.connect.sink.SinkRecord;
 
-public interface RecordWriter extends Cloneable {
+interface RecordWriter extends Cloneable {
 
   void write(SinkRecord record);
 
-  List<WriterResult> complete();
+  List<IcebergWriterResult> complete();
 
   void close();
 }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
new file mode 100644
index 0000000000..35a2957f01
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {
+      routeRecordDynamically(record);
+    } else {
+      routeRecordStatically(record);
+    }
+  }
+
+  private void routeRecordStatically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+
+    if (routeField == null) {
+      // route to all tables
+      config
+          .tables()
+          .forEach(
+              tableName -> {
+                writerForTable(tableName, record, false).write(record);
+              });
+
+    } else {
+      String routeValue = extractRouteValue(record.value(), routeField);
+      if (routeValue != null) {
+        config
+            .tables()
+            .forEach(
+                tableName -> {
+                  Pattern regex = config.tableConfig(tableName).routeRegex();
+                  if (regex != null && regex.matcher(routeValue).matches()) {
+                    writerForTable(tableName, record, false).write(record);
+                  }
+                });
+      }
+    }
+  }
+
+  private void routeRecordDynamically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+    Preconditions.checkNotNull(routeField, "Route field cannot be null with 
dynamic routing");
+
+    String routeValue = extractRouteValue(record.value(), routeField);
+    if (routeValue != null) {
+      String tableName = routeValue.toLowerCase();
+      writerForTable(tableName, record, true).write(record);
+    }
+  }
+
+  private String extractRouteValue(Object recordValue, String routeField) {
+    if (recordValue == null) {
+      return null;
+    }
+    Object routeValue = RecordUtils.extractFromRecordValue(recordValue, 
routeField);
+    return routeValue == null ? null : routeValue.toString();
+  }
+
+  private RecordWriter writerForTable(
+      String tableName, SinkRecord sample, boolean ignoreMissingTable) {
+    return writers.computeIfAbsent(
+        tableName, notUsed -> writerFactory.createWriter(tableName, sample, 
ignoreMissingTable));
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
similarity index 60%
copy from 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to 
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
index 64ca44f032..ef899102bb 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
@@ -19,22 +19,24 @@
 package org.apache.iceberg.connect.data;
 
 import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
 
-class NoOpWriter implements RecordWriter {
-  @Override
-  public void write(SinkRecord record) {
-    // NO-OP
+public class SinkWriterResult {
+  private final List<IcebergWriterResult> writerResults;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriterResult(
+      List<IcebergWriterResult> writerResults, Map<TopicPartition, Offset> 
sourceOffsets) {
+    this.writerResults = writerResults;
+    this.sourceOffsets = sourceOffsets;
   }
 
-  @Override
-  public List<WriterResult> complete() {
-    // NO-OP
-    return null;
+  public List<IcebergWriterResult> writerResults() {
+    return writerResults;
   }
 
-  @Override
-  public void close() {
-    // NO-OP
+  public Map<TopicPartition, Offset> sourceOffsets() {
+    return sourceOffsets;
   }
 }
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
similarity index 60%
rename from 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
rename to 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
index cfa1709da7..ce92b3efc3 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -27,19 +27,15 @@ import java.nio.file.Path;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.connect.IcebergSinkConfig;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.inmemory.InMemoryCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-public class UtilitiesTest {
+public class CatalogUtilsTest {
 
   private static final String HADOOP_CONF_TEMPLATE =
       
"<configuration><property><name>%s</name><value>%s</value></property></configuration>";
@@ -68,7 +64,7 @@ public class UtilitiesTest {
             "iceberg.catalog.catalog-impl",
             TestCatalog.class.getName());
     IcebergSinkConfig config = new IcebergSinkConfig(props);
-    Catalog result = Utilities.loadCatalog(config);
+    Catalog result = CatalogUtils.loadCatalog(config);
 
     assertThat(result).isInstanceOf(TestCatalog.class);
 
@@ -102,7 +98,7 @@ public class UtilitiesTest {
             "iceberg.catalog.catalog-impl",
             TestCatalog.class.getName());
     IcebergSinkConfig config = new IcebergSinkConfig(props);
-    Catalog result = Utilities.loadCatalog(config);
+    Catalog result = CatalogUtils.loadCatalog(config);
 
     assertThat(result).isInstanceOf(TestCatalog.class);
 
@@ -118,66 +114,4 @@ public class UtilitiesTest {
     // check that core-site.xml was loaded
     assertThat(conf.get("foo")).isEqualTo("bar");
   }
-
-  @Test
-  public void testExtractFromRecordValueStruct() {
-    Schema valSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
-    Struct val = new Struct(valSchema).put("key", 123L);
-    Object result = Utilities.extractFromRecordValue(val, "key");
-    assertThat(result).isEqualTo(123L);
-  }
-
-  @Test
-  public void testExtractFromRecordValueStructNested() {
-    Schema idSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
-    Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build();
-    Schema valSchema = SchemaBuilder.struct().field("data", 
dataSchema).build();
-
-    Struct id = new Struct(idSchema).put("key", 123L);
-    Struct data = new Struct(dataSchema).put("id", id);
-    Struct val = new Struct(valSchema).put("data", data);
-
-    Object result = Utilities.extractFromRecordValue(val, "data.id.key");
-    assertThat(result).isEqualTo(123L);
-  }
-
-  @Test
-  public void testExtractFromRecordValueStructNull() {
-    Schema valSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
-    Struct val = new Struct(valSchema).put("key", 123L);
-
-    Object result = Utilities.extractFromRecordValue(val, "");
-    assertThat(result).isNull();
-
-    result = Utilities.extractFromRecordValue(val, "xkey");
-    assertThat(result).isNull();
-  }
-
-  @Test
-  public void testExtractFromRecordValueMap() {
-    Map<String, Object> val = ImmutableMap.of("key", 123L);
-    Object result = Utilities.extractFromRecordValue(val, "key");
-    assertThat(result).isEqualTo(123L);
-  }
-
-  @Test
-  public void testExtractFromRecordValueMapNested() {
-    Map<String, Object> id = ImmutableMap.of("key", 123L);
-    Map<String, Object> data = ImmutableMap.of("id", id);
-    Map<String, Object> val = ImmutableMap.of("data", data);
-
-    Object result = Utilities.extractFromRecordValue(val, "data.id.key");
-    assertThat(result).isEqualTo(123L);
-  }
-
-  @Test
-  public void testExtractFromRecordValueMapNull() {
-    Map<String, Object> val = ImmutableMap.of("key", 123L);
-
-    Object result = Utilities.extractFromRecordValue(val, "");
-    assertThat(result).isNull();
-
-    result = Utilities.extractFromRecordValue(val, "xkey");
-    assertThat(result).isNull();
-  }
 }
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
new file mode 100644
index 0000000000..e6ffefbd97
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+public class ChannelTestBase {
+  protected static final String SRC_TOPIC_NAME = "src-topic";
+  protected static final String CTL_TOPIC_NAME = "ctl-topic";
+  protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect";
+  protected InMemoryCatalog catalog;
+  protected Table table;
+  protected IcebergSinkConfig config;
+  protected KafkaClientFactory clientFactory;
+  protected MockProducer<String, byte[]> producer;
+  protected MockConsumer<String, byte[]> consumer;
+  protected Admin admin;
+
+  private InMemoryCatalog initInMemoryCatalog() {
+    InMemoryCatalog inMemoryCatalog = new InMemoryCatalog();
+    inMemoryCatalog.initialize(null, ImmutableMap.of());
+    return inMemoryCatalog;
+  }
+
+  protected static final Namespace NAMESPACE = Namespace.of("db");
+  protected static final String TABLE_NAME = "tbl";
+  protected static final TableIdentifier TABLE_IDENTIFIER =
+      TableIdentifier.of(NAMESPACE, TABLE_NAME);
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.LongType.get()),
+          optional(2, "data", Types.StringType.get()),
+          required(3, "date", Types.StringType.get()));
+
+  protected static final String COMMIT_ID_SNAPSHOT_PROP = 
"kafka.connect.commit-id";
+  protected static final String OFFSETS_SNAPSHOT_PROP =
+      String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, 
CONNECT_CONSUMER_GROUP_ID);
+  protected static final String VALID_THROUGH_TS_SNAPSHOT_PROP = 
"kafka.connect.valid-through-ts";
+
+  @BeforeEach
+  @SuppressWarnings("deprecation")
+  public void before() {
+    catalog = initInMemoryCatalog();
+    catalog.createNamespace(NAMESPACE);
+    table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+
+    config = mock(IcebergSinkConfig.class);
+    when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME);
+    when(config.commitThreads()).thenReturn(1);
+    when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
+    when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+
+    TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
+    when(partitionInfo.partition()).thenReturn(0);
+    TopicDescription topicDesc =
+        new TopicDescription(SRC_TOPIC_NAME, false, 
ImmutableList.of(partitionInfo));
+    DescribeTopicsResult describeResult = mock(DescribeTopicsResult.class);
+    when(describeResult.values())
+        .thenReturn(ImmutableMap.of(SRC_TOPIC_NAME, 
KafkaFuture.completedFuture(topicDesc)));
+
+    admin = mock(Admin.class);
+    when(admin.describeTopics(anyCollection())).thenReturn(describeResult);
+
+    producer = new MockProducer<>(false, new StringSerializer(), new 
ByteArraySerializer());
+    producer.initTransactions();
+
+    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+
+    clientFactory = mock(KafkaClientFactory.class);
+    when(clientFactory.createProducer(any())).thenReturn(producer);
+    when(clientFactory.createConsumer(any())).thenReturn(consumer);
+    when(clientFactory.createAdmin()).thenReturn(admin);
+  }
+
+  @AfterEach
+  public void after() throws IOException {
+    catalog.close();
+  }
+
+  protected void initConsumer() {
+    TopicPartition tp = new TopicPartition(CTL_TOPIC_NAME, 0);
+    consumer.rebalance(ImmutableList.of(tp));
+    consumer.updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
new file mode 100644
index 0000000000..a9fe1ad099
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.Payload;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Test;
+
+public class CommitStateTest {
+  @Test
+  public void testIsCommitReady() {
+    TopicPartitionOffset tp = mock(TopicPartitionOffset.class);
+
+    CommitState commitState = new CommitState(mock(IcebergSinkConfig.class));
+    commitState.startNewCommit();
+
+    DataComplete payload1 = mock(DataComplete.class);
+    when(payload1.commitId()).thenReturn(commitState.currentCommitId());
+    when(payload1.assignments()).thenReturn(ImmutableList.of(tp, tp));
+
+    DataComplete payload2 = mock(DataComplete.class);
+    when(payload2.commitId()).thenReturn(commitState.currentCommitId());
+    when(payload2.assignments()).thenReturn(ImmutableList.of(tp));
+
+    DataComplete payload3 = mock(DataComplete.class);
+    when(payload3.commitId()).thenReturn(UUID.randomUUID());
+    when(payload3.assignments()).thenReturn(ImmutableList.of(tp));
+
+    commitState.addReady(wrapInEnvelope(payload1));
+    commitState.addReady(wrapInEnvelope(payload2));
+    commitState.addReady(wrapInEnvelope(payload3));
+
+    assertThat(commitState.isCommitReady(3)).isTrue();
+    assertThat(commitState.isCommitReady(4)).isFalse();
+  }
+
+  @Test
+  public void testGetValidThroughTs() {
+    DataComplete payload1 = mock(DataComplete.class);
+    TopicPartitionOffset tp1 = mock(TopicPartitionOffset.class);
+    OffsetDateTime ts1 = EventTestUtil.now();
+    when(tp1.timestamp()).thenReturn(ts1);
+
+    TopicPartitionOffset tp2 = mock(TopicPartitionOffset.class);
+    OffsetDateTime ts2 = ts1.plusSeconds(1);
+    when(tp2.timestamp()).thenReturn(ts2);
+    when(payload1.assignments()).thenReturn(ImmutableList.of(tp1, tp2));
+
+    DataComplete payload2 = mock(DataComplete.class);
+    TopicPartitionOffset tp3 = mock(TopicPartitionOffset.class);
+    OffsetDateTime ts3 = ts1.plusSeconds(2);
+    when(tp3.timestamp()).thenReturn(ts3);
+    when(payload2.assignments()).thenReturn(ImmutableList.of(tp3));
+
+    CommitState commitState = new CommitState(mock(IcebergSinkConfig.class));
+    commitState.startNewCommit();
+
+    commitState.addReady(wrapInEnvelope(payload1));
+    commitState.addReady(wrapInEnvelope(payload2));
+
+    assertThat(commitState.validThroughTs(false)).isEqualTo(ts1);
+    assertThat(commitState.validThroughTs(true)).isNull();
+
+    // null timestamp for one, so should not set a valid-through timestamp
+    DataComplete payload3 = mock(DataComplete.class);
+    TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class);
+    when(tp4.timestamp()).thenReturn(null);
+    when(payload3.assignments()).thenReturn(ImmutableList.of(tp4));
+
+    commitState.addReady(wrapInEnvelope(payload3));
+
+    assertThat(commitState.validThroughTs(false)).isNull();
+    assertThat(commitState.validThroughTs(true)).isNull();
+  }
+
+  private Envelope wrapInEnvelope(Payload payload) {
+    Event event = mock(Event.class);
+    when(event.payload()).thenReturn(payload);
+    return new Envelope(event, 0, 0);
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
new file mode 100644
index 0000000000..7c8ccf8ef6
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+public class CommitterImplTest {
+
+  @Test
+  public void testIsLeader() {
+    CommitterImpl committer = new CommitterImpl();
+
+    MemberAssignment assignment1 =
+        new MemberAssignment(
+            ImmutableSet.of(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 1)));
+    MemberDescription member1 =
+        new MemberDescription(null, Optional.empty(), null, null, assignment1);
+
+    MemberAssignment assignment2 =
+        new MemberAssignment(
+            ImmutableSet.of(new TopicPartition("topic2", 0), new 
TopicPartition("topic1", 1)));
+    MemberDescription member2 =
+        new MemberDescription(null, Optional.empty(), null, null, assignment2);
+
+    List<MemberDescription> members = ImmutableList.of(member1, member2);
+
+    List<TopicPartition> assignments =
+        ImmutableList.of(new TopicPartition("topic2", 1), new 
TopicPartition("topic1", 0));
+    assertThat(committer.isLeader(members, assignments)).isTrue();
+
+    assignments =
+        ImmutableList.of(new TopicPartition("topic2", 0), new 
TopicPartition("topic1", 1));
+    assertThat(committer.isLeader(members, assignments)).isFalse();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
new file mode 100644
index 0000000000..9c0b8122ae
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.CommitComplete;
+import org.apache.iceberg.connect.events.CommitToTable;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class CoordinatorTest extends ChannelTestBase {
+
+  @Test
+  public void testCommitAppend() {
+    Assertions.assertEquals(0, 
ImmutableList.copyOf(table.snapshots().iterator()).size());
+
+    OffsetDateTime ts = EventTestUtil.now();
+    UUID commitId =
+        coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), 
ImmutableList.of(), ts);
+    table.refresh();
+
+    assertThat(producer.history()).hasSize(3);
+    assertCommitTable(1, commitId, ts);
+    assertCommitComplete(2, commitId, ts);
+
+    List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+    Assertions.assertEquals(1, snapshots.size());
+
+    Snapshot snapshot = snapshots.get(0);
+    Assertions.assertEquals(DataOperations.APPEND, snapshot.operation());
+    Assertions.assertEquals(1, 
ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size());
+    Assertions.assertEquals(0, 
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
+
+    Map<String, String> summary = snapshot.summary();
+    Assertions.assertEquals(commitId.toString(), 
summary.get(COMMIT_ID_SNAPSHOT_PROP));
+    Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP));
+    Assertions.assertEquals(ts.toString(), 
summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP));
+  }
+
+  @Test
+  public void testCommitDelta() {
+    OffsetDateTime ts = EventTestUtil.now();
+    UUID commitId =
+        coordinatorTest(
+            ImmutableList.of(EventTestUtil.createDataFile()),
+            ImmutableList.of(EventTestUtil.createDeleteFile()),
+            ts);
+
+    assertThat(producer.history()).hasSize(3);
+    assertCommitTable(1, commitId, ts);
+    assertCommitComplete(2, commitId, ts);
+
+    List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+    Assertions.assertEquals(1, snapshots.size());
+
+    Snapshot snapshot = snapshots.get(0);
+    Assertions.assertEquals(DataOperations.OVERWRITE, snapshot.operation());
+    Assertions.assertEquals(1, 
ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size());
+    Assertions.assertEquals(1, 
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
+
+    Map<String, String> summary = snapshot.summary();
+    Assertions.assertEquals(commitId.toString(), 
summary.get(COMMIT_ID_SNAPSHOT_PROP));
+    Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP));
+    Assertions.assertEquals(ts.toString(), 
summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP));
+  }
+
+  @Test
+  public void testCommitNoFiles() {
+    OffsetDateTime ts = EventTestUtil.now();
+    UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(), 
ts);
+
+    assertThat(producer.history()).hasSize(2);
+    assertCommitComplete(1, commitId, ts);
+
+    List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+    Assertions.assertEquals(0, snapshots.size());
+  }
+
+  @Test
+  public void testCommitError() {
+    // this spec isn't registered with the table
+    PartitionSpec badPartitionSpec =
+        PartitionSpec.builderFor(SCHEMA).withSpecId(1).identity("id").build();
+    DataFile badDataFile =
+        DataFiles.builder(badPartitionSpec)
+            .withPath(UUID.randomUUID() + ".parquet")
+            .withFormat(FileFormat.PARQUET)
+            .withFileSizeInBytes(100L)
+            .withRecordCount(5)
+            .build();
+
+    coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+
+    // no commit messages sent
+    assertThat(producer.history()).hasSize(1);
+
+    List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+    Assertions.assertEquals(0, snapshots.size());
+  }
+
+  private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
+    byte[] bytes = producer.history().get(idx).value();
+    Event commitTable = AvroUtil.decode(bytes);
+    assertThat(commitTable.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE);
+    CommitToTable commitToTablePayload = (CommitToTable) commitTable.payload();
+    assertThat(commitToTablePayload.commitId()).isEqualTo(commitId);
+    assertThat(commitToTablePayload.tableReference().identifier().toString())
+        .isEqualTo(TABLE_IDENTIFIER.toString());
+    assertThat(commitToTablePayload.validThroughTs()).isEqualTo(ts);
+  }
+
+  private void assertCommitComplete(int idx, UUID commitId, OffsetDateTime ts) 
{
+    byte[] bytes = producer.history().get(idx).value();
+    Event commitComplete = AvroUtil.decode(bytes);
+    assertThat(commitComplete.type()).isEqualTo(PayloadType.COMMIT_COMPLETE);
+    CommitComplete commitCompletePayload = (CommitComplete) 
commitComplete.payload();
+    assertThat(commitCompletePayload.commitId()).isEqualTo(commitId);
+    assertThat(commitCompletePayload.validThroughTs()).isEqualTo(ts);
+  }
+
+  private UUID coordinatorTest(
+      List<DataFile> dataFiles, List<DeleteFile> deleteFiles, OffsetDateTime 
ts) {
+    when(config.commitIntervalMs()).thenReturn(0);
+    when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+    SinkTaskContext context = mock(SinkTaskContext.class);
+    Coordinator coordinator =
+        new Coordinator(catalog, config, ImmutableList.of(), clientFactory, 
context);
+    coordinator.start();
+
+    // init consumer after subscribe()
+    initConsumer();
+
+    coordinator.process();
+
+    assertThat(producer.transactionCommitted()).isTrue();
+    assertThat(producer.history()).hasSize(1);
+
+    byte[] bytes = producer.history().get(0).value();
+    Event commitRequest = AvroUtil.decode(bytes);
+    assertThat(commitRequest.type()).isEqualTo(PayloadType.START_COMMIT);
+
+    UUID commitId = ((StartCommit) commitRequest.payload()).commitId();
+
+    Event commitResponse =
+        new Event(
+            config.connectGroupId(),
+            new DataWritten(
+                StructType.of(),
+                commitId,
+                new TableReference("catalog", ImmutableList.of("db"), "tbl"),
+                dataFiles,
+                deleteFiles));
+    bytes = AvroUtil.encode(commitResponse);
+    consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", 
bytes));
+
+    Event commitReady =
+        new Event(
+            config.connectGroupId(),
+            new DataComplete(
+                commitId, ImmutableList.of(new TopicPartitionOffset("topic", 
1, 1L, ts))));
+    bytes = AvroUtil.encode(commitReady);
+    consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key", 
bytes));
+
+    when(config.commitIntervalMs()).thenReturn(0);
+
+    coordinator.process();
+
+    return commitId;
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
new file mode 100644
index 0000000000..da0d881f89
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.jupiter.api.Test;
+
+public class CoordinatorThreadTest {
+
+  @Test
+  public void testRun() {
+    Coordinator coordinator = mock(Coordinator.class);
+    CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+
+    coordinatorThread.start();
+
+    verify(coordinator, timeout(1000)).start();
+    verify(coordinator, timeout(1000).atLeast(1)).process();
+    verify(coordinator, times(0)).stop();
+    assertThat(coordinatorThread.isTerminated()).isFalse();
+
+    coordinatorThread.terminate();
+
+    verify(coordinator, timeout(1000)).stop();
+    assertThat(coordinatorThread.isTerminated()).isTrue();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
similarity index 95%
copy from 
kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
copy to 
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
index 8f1f7a601f..8c3625b74a 100644
--- 
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.connect.events;
+package org.apache.iceberg.connect.channel;
 
 import java.nio.ByteBuffer;
 import java.time.OffsetDateTime;
@@ -44,8 +44,7 @@ class EventTestUtil {
   static final Schema SCHEMA =
       new Schema(ImmutableList.of(Types.NestedField.required(1, "id", 
Types.LongType.get())));
 
-  static final PartitionSpec SPEC =
-      PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
+  static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
 
   static final SortOrder ORDER =
       SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, 
NullOrder.NULLS_FIRST).build();
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
new file mode 100644
index 0000000000..577c28fe63
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.data.IcebergWriterResult;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.data.SinkWriter;
+import org.apache.iceberg.connect.data.SinkWriterResult;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class WorkerTest extends ChannelTestBase {
+
+  @Test
+  public void testSave() {
+    when(config.catalogName()).thenReturn("catalog");
+
+    try (MockedStatic<KafkaUtils> mockKafkaUtils = 
mockStatic(KafkaUtils.class)) {
+      ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+      mockKafkaUtils
+          .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+          .thenReturn(consumerGroupMetadata);
+
+      SinkTaskContext context = mock(SinkTaskContext.class);
+      TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+      when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+      IcebergWriterResult writeResult =
+          new IcebergWriterResult(
+              TableIdentifier.parse(TABLE_NAME),
+              ImmutableList.of(EventTestUtil.createDataFile()),
+              ImmutableList.of(),
+              StructType.of());
+
+      Map<TopicPartition, Offset> offsets =
+          ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now()));
+
+      SinkWriterResult sinkWriterResult =
+          new SinkWriterResult(ImmutableList.of(writeResult), offsets);
+      SinkWriter sinkWriter = mock(SinkWriter.class);
+      when(sinkWriter.completeWrite()).thenReturn(sinkWriterResult);
+
+      Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+      worker.start();
+
+      // init consumer after subscribe()
+      initConsumer();
+
+      // save a record
+      Map<String, Object> value = ImmutableMap.of();
+      SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, 
value, 0L);
+      worker.save(ImmutableList.of(rec));
+
+      UUID commitId = UUID.randomUUID();
+      Event commitRequest = new Event(config.connectGroupId(), new 
StartCommit(commitId));
+      byte[] bytes = AvroUtil.encode(commitRequest);
+      consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", 
bytes));
+
+      worker.process();
+
+      assertThat(producer.history()).hasSize(2);
+
+      Event event = AvroUtil.decode(producer.history().get(0).value());
+      assertThat(event.payload().type()).isEqualTo(PayloadType.DATA_WRITTEN);
+      DataWritten dataWritten = (DataWritten) event.payload();
+      assertThat(dataWritten.commitId()).isEqualTo(commitId);
+
+      event = AvroUtil.decode(producer.history().get(1).value());
+      assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE);
+      DataComplete dataComplete = (DataComplete) event.payload();
+      assertThat(dataComplete.commitId()).isEqualTo(commitId);
+      assertThat(dataComplete.assignments()).hasSize(1);
+      assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+    }
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
index 80adc7fc3e..ac44952a5c 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
@@ -73,7 +73,7 @@ public class BaseWriterTest {
 
   protected WriteResult writeTest(
       List<Record> rows, IcebergSinkConfig config, Class<?> 
expectedWriterClass) {
-    try (TaskWriter<Record> writer = Utilities.createTableWriter(table, 
"name", config)) {
+    try (TaskWriter<Record> writer = RecordUtils.createTableWriter(table, 
"name", config)) {
       assertThat(writer.getClass()).isEqualTo(expectedWriterClass);
 
       rows.forEach(
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
new file mode 100644
index 0000000000..08e832256a
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+public class RecordUtilsTest {
+
+  @Test
+  public void testExtractFromRecordValueStruct() {
+    Schema valSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
+    Struct val = new Struct(valSchema).put("key", 123L);
+    Object result = RecordUtils.extractFromRecordValue(val, "key");
+    assertThat(result).isEqualTo(123L);
+  }
+
+  @Test
+  public void testExtractFromRecordValueStructNested() {
+    Schema idSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
+    Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build();
+    Schema valSchema = SchemaBuilder.struct().field("data", 
dataSchema).build();
+
+    Struct id = new Struct(idSchema).put("key", 123L);
+    Struct data = new Struct(dataSchema).put("id", id);
+    Struct val = new Struct(valSchema).put("data", data);
+
+    Object result = RecordUtils.extractFromRecordValue(val, "data.id.key");
+    assertThat(result).isEqualTo(123L);
+  }
+
+  @Test
+  public void testExtractFromRecordValueStructNull() {
+    Schema valSchema = SchemaBuilder.struct().field("key", 
Schema.INT64_SCHEMA).build();
+    Struct val = new Struct(valSchema).put("key", 123L);
+
+    Object result = RecordUtils.extractFromRecordValue(val, "");
+    assertThat(result).isNull();
+
+    result = RecordUtils.extractFromRecordValue(val, "xkey");
+    assertThat(result).isNull();
+  }
+
+  @Test
+  public void testExtractFromRecordValueMap() {
+    Map<String, Object> val = ImmutableMap.of("key", 123L);
+    Object result = RecordUtils.extractFromRecordValue(val, "key");
+    assertThat(result).isEqualTo(123L);
+  }
+
+  @Test
+  public void testExtractFromRecordValueMapNested() {
+    Map<String, Object> id = ImmutableMap.of("key", 123L);
+    Map<String, Object> data = ImmutableMap.of("id", id);
+    Map<String, Object> val = ImmutableMap.of("data", data);
+
+    Object result = RecordUtils.extractFromRecordValue(val, "data.id.key");
+    assertThat(result).isEqualTo(123L);
+  }
+
+  @Test
+  public void testExtractFromRecordValueMapNull() {
+    Map<String, Object> val = ImmutableMap.of("key", 123L);
+
+    Object result = RecordUtils.extractFromRecordValue(val, "");
+    assertThat(result).isNull();
+
+    result = RecordUtils.extractFromRecordValue(val, "xkey");
+    assertThat(result).isNull();
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
new file mode 100644
index 0000000000..be29ef1022
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class SchemaUpdateTest {
+
+  @Test
+  public void testAddColumn() {
+    SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+    updateConsumer.addColumn("parent", "name", Types.StringType.get());
+    assertThat(updateConsumer.addColumns()).hasSize(1);
+    assertThat(updateConsumer.updateTypes()).isEmpty();
+    assertThat(updateConsumer.makeOptionals()).isEmpty();
+
+    SchemaUpdate.AddColumn addColumn = 
updateConsumer.addColumns().iterator().next();
+    assertThat(addColumn.parentName()).isEqualTo("parent");
+    assertThat(addColumn.name()).isEqualTo("name");
+    assertThat(addColumn.type()).isEqualTo(Types.StringType.get());
+  }
+
+  @Test
+  public void testUpdateType() {
+    SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+    updateConsumer.updateType("name", Types.LongType.get());
+    assertThat(updateConsumer.addColumns()).isEmpty();
+
+    assertThat(updateConsumer.updateTypes()).hasSize(1);
+    assertThat(updateConsumer.makeOptionals()).isEmpty();
+
+    SchemaUpdate.UpdateType updateType = 
updateConsumer.updateTypes().iterator().next();
+    assertThat(updateType.name()).isEqualTo("name");
+    assertThat(updateType.type()).isEqualTo(Types.LongType.get());
+  }
+
+  @Test
+  public void testMakeOptional() {
+    SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+    updateConsumer.makeOptional("name");
+    assertThat(updateConsumer.addColumns()).isEmpty();
+
+    assertThat(updateConsumer.updateTypes()).isEmpty();
+    assertThat(updateConsumer.makeOptionals()).hasSize(1);
+
+    SchemaUpdate.MakeOptional makeOptional = 
updateConsumer.makeOptionals().iterator().next();
+    assertThat(makeOptional.name()).isEqualTo("name");
+  }
+}
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
new file mode 100644
index 0000000000..4a17b926fc
--- /dev/null
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SinkWriterTest {
+
+  private InMemoryCatalog catalog;
+
+  private static final Namespace NAMESPACE = Namespace.of("db");
+  private static final String TABLE_NAME = "tbl";
+  private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(NAMESPACE, TABLE_NAME);
+  private static final Schema SCHEMA =
+      new Schema(
+          optional(1, "id", Types.LongType.get()),
+          optional(2, "data", Types.StringType.get()),
+          optional(3, "date", Types.StringType.get()));
+  private static final String ROUTE_FIELD = "fld";
+
+  @BeforeEach
+  public void before() {
+    catalog = initInMemoryCatalog();
+    catalog.createNamespace(NAMESPACE);
+    catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+  }
+
+  @AfterEach
+  public void after() throws IOException {
+    catalog.close();
+  }
+
+  private InMemoryCatalog initInMemoryCatalog() {
+    InMemoryCatalog inMemoryCatalog = new InMemoryCatalog();
+    inMemoryCatalog.initialize(null, ImmutableMap.of());
+    return inMemoryCatalog;
+  }
+
+  @Test
+  public void testDefaultRoute() {
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+    
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+    Map<String, Object> value = ImmutableMap.of();
+
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(1);
+    IcebergWriterResult writerResult = writerResults.get(0);
+    assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+  }
+
+  @Test
+  public void testDefaultNoRoute() {
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+    when(config.tables()).thenReturn(ImmutableList.of());
+    Map<String, Object> value = ImmutableMap.of();
+
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void testStaticRoute() {
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+    when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+    Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "val");
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(1);
+    IcebergWriterResult writerResult = writerResults.get(0);
+    assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+  }
+
+  @Test
+  public void testStaticNoRoute() {
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+    when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+    Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "foobar");
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void testDynamicRoute() {
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+    when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+    when(config.dynamicTablesEnabled()).thenReturn(true);
+    when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+    Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, 
TABLE_IDENTIFIER.toString());
+
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(1);
+    IcebergWriterResult writerResult = writerResults.get(0);
+    assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+  }
+
+  @Test
+  public void testDynamicNoRoute() {
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+    when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+    when(config.dynamicTablesEnabled()).thenReturn(true);
+    when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+    Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "db.foobar");
+
+    List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+    assertThat(writerResults.size()).isEqualTo(0);
+  }
+
+  private List<IcebergWriterResult> sinkWriterTest(
+      Map<String, Object> value, IcebergSinkConfig config) {
+    IcebergWriterResult writeResult =
+        new IcebergWriterResult(
+            TableIdentifier.parse(TABLE_NAME),
+            ImmutableList.of(mock(DataFile.class)),
+            ImmutableList.of(),
+            Types.StructType.of());
+    IcebergWriter writer = mock(IcebergWriter.class);
+    when(writer.complete()).thenReturn(ImmutableList.of(writeResult));
+
+    IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class);
+    when(writerFactory.createWriter(any(), any(), 
anyBoolean())).thenReturn(writer);
+
+    SinkWriter sinkWriter = new SinkWriter(catalog, config);
+
+    // save a record
+    Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+    SinkRecord rec =
+        new SinkRecord(
+            "topic",
+            1,
+            null,
+            "key",
+            null,
+            value,
+            100L,
+            now.toEpochMilli(),
+            TimestampType.LOG_APPEND_TIME);
+    sinkWriter.save(ImmutableList.of(rec));
+
+    SinkWriterResult result = sinkWriter.completeWrite();
+
+    Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
+    assertThat(offset).isNotNull();
+    assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than 
current offset
+    assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
+
+    return result.writerResults();
+  }
+}


Reply via email to