This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d8e7642d4e Kafka Connect: Commit coordination (#10351)
d8e7642d4e is described below
commit d8e7642d4e61b6d442c38b2cab1f631864c4e368
Author: Bryan Keller <[email protected]>
AuthorDate: Thu Jul 11 08:22:20 2024 -0700
Kafka Connect: Commit coordination (#10351)
* Kafka Connect: Commit coordination
* PR feedback
* Get partition assignment from consumer
* test fix
---
.../iceberg/connect/events/TableReference.java | 20 ++
.../iceberg/connect/events/EventTestUtil.java | 3 +-
.../org/apache/iceberg/connect/CatalogUtils.java | 98 +++++++
.../{data/NoOpWriter.java => Committer.java} | 24 +-
.../RecordWriter.java => CommitterFactory.java} | 16 +-
.../apache/iceberg/connect/IcebergSinkConfig.java | 23 +-
.../iceberg/connect/IcebergSinkConnector.java | 4 +-
.../apache/iceberg/connect/IcebergSinkTask.java | 107 +++++++
.../apache/iceberg/connect/channel/Channel.java | 167 +++++++++++
.../iceberg/connect/channel/CommitState.java | 167 +++++++++++
.../iceberg/connect/channel/CommitterImpl.java | 128 +++++++++
.../iceberg/connect/channel/Coordinator.java | 314 +++++++++++++++++++++
.../iceberg/connect/channel/CoordinatorThread.java | 69 +++++
.../NoOpWriter.java => channel/Envelope.java} | 33 ++-
.../connect/channel/KafkaClientFactory.java | 68 +++++
.../apache/iceberg/connect/channel/KafkaUtils.java | 66 +++++
.../NotRunningException.java} | 16 +-
.../org/apache/iceberg/connect/channel/Worker.java | 126 +++++++++
.../apache/iceberg/connect/data/IcebergWriter.java | 14 +-
.../iceberg/connect/data/IcebergWriterFactory.java | 7 +-
...{WriterResult.java => IcebergWriterResult.java} | 4 +-
.../apache/iceberg/connect/data/NoOpWriter.java | 5 +-
.../data/{RecordWriter.java => Offset.java} | 35 ++-
.../data/{Utilities.java => RecordUtils.java} | 78 +----
.../apache/iceberg/connect/data/RecordWriter.java | 4 +-
.../apache/iceberg/connect/data/SinkWriter.java | 140 +++++++++
.../{NoOpWriter.java => SinkWriterResult.java} | 26 +-
.../UtilitiesTest.java => CatalogUtilsTest.java} | 74 +----
.../iceberg/connect/channel/ChannelTestBase.java | 131 +++++++++
.../iceberg/connect/channel/CommitStateTest.java | 107 +++++++
.../iceberg/connect/channel/CommitterImplTest.java | 60 ++++
.../iceberg/connect/channel/CoordinatorTest.java | 213 ++++++++++++++
.../connect/channel/CoordinatorThreadTest.java | 48 ++++
.../iceberg/connect/channel}/EventTestUtil.java | 5 +-
.../apache/iceberg/connect/channel/WorkerTest.java | 116 ++++++++
.../iceberg/connect/data/BaseWriterTest.java | 2 +-
.../iceberg/connect/data/RecordUtilsTest.java | 93 ++++++
.../iceberg/connect/data/SchemaUpdateTest.java | 68 +++++
.../iceberg/connect/data/SinkWriterTest.java | 210 ++++++++++++++
39 files changed, 2628 insertions(+), 261 deletions(-)
diff --git
a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
index 50eaa10504..f30eac8924 100644
---
a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
+++
b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.connect.events;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -120,4 +121,23 @@ public class TableReference implements IndexedRecord {
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableReference that = (TableReference) o;
+ return Objects.equals(catalog, that.catalog)
+ && Objects.equals(namespace, that.namespace)
+ && Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(catalog, namespace, name);
+ }
}
diff --git
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
index 8f1f7a601f..48e268bf05 100644
---
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
+++
b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
@@ -44,8 +44,7 @@ class EventTestUtil {
static final Schema SCHEMA =
new Schema(ImmutableList.of(Types.NestedField.required(1, "id",
Types.LongType.get())));
- static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
+ static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
static final SortOrder ORDER =
SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC,
NullOrder.NULLS_FIRST).build();
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
new file mode 100644
index 0000000000..a3c6358e1b
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CatalogUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.common.DynMethods.BoundMethod;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CatalogUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CatalogUtils.class.getName());
+ private static final List<String> HADOOP_CONF_FILES =
+ ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+
+ static Catalog loadCatalog(IcebergSinkConfig config) {
+ return CatalogUtil.buildIcebergCatalog(
+ config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
+ }
+
+ // use reflection here to avoid requiring Hadoop as a dependency
+ private static Object loadHadoopConfig(IcebergSinkConfig config) {
+ Class<?> configClass =
+ DynClasses.builder()
+ .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
+ .impl("org.apache.hadoop.conf.Configuration")
+ .orNull()
+ .build();
+
+ if (configClass == null) {
+ LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+ return null;
+ }
+
+ try {
+ Object result =
DynConstructors.builder().hiddenImpl(configClass).build().newInstance();
+ BoundMethod addResourceMethod =
+ DynMethods.builder("addResource").impl(configClass,
URL.class).build(result);
+ BoundMethod setMethod =
+ DynMethods.builder("set").impl(configClass, String.class,
String.class).build(result);
+
+ // load any config files in the specified config directory
+ String hadoopConfDir = config.hadoopConfDir();
+ if (hadoopConfDir != null) {
+ HADOOP_CONF_FILES.forEach(
+ confFile -> {
+ Path path = Paths.get(hadoopConfDir, confFile);
+ if (Files.exists(path)) {
+ try {
+ addResourceMethod.invoke(path.toUri().toURL());
+ } catch (IOException e) {
+ LOG.warn("Error adding Hadoop resource {}, resource was not
added", path, e);
+ }
+ }
+ });
+ }
+
+ // set any Hadoop properties specified in the sink config
+ config.hadoopProps().forEach(setMethod::invoke);
+
+ LOG.info("Hadoop config initialized: {}", configClass.getName());
+ return result;
+ } catch (Exception e) {
+ LOG.warn(
+ "Hadoop found on classpath but could not create config, proceeding
without config", e);
+ }
+ return null;
+ }
+
+ private CatalogUtils() {}
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
similarity index 71%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
index 64ca44f032..edc217d1b0 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/Committer.java
@@ -16,25 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
-import java.util.List;
+import java.util.Collection;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
-class NoOpWriter implements RecordWriter {
- @Override
- public void write(SinkRecord record) {
- // NO-OP
- }
+public interface Committer {
+ void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext
context);
- @Override
- public List<WriterResult> complete() {
- // NO-OP
- return null;
- }
+ void stop();
- @Override
- public void close() {
- // NO-OP
- }
+ void save(Collection<SinkRecord> sinkRecords);
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
similarity index 76%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
index 0b4d7566ea..18ff118c77 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/CommitterFactory.java
@@ -16,16 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.iceberg.connect.channel.CommitterImpl;
-public interface RecordWriter extends Cloneable {
+class CommitterFactory {
+ static Committer createCommitter(IcebergSinkConfig config) {
+ return new CommitterImpl();
+ }
- void write(SinkRecord record);
-
- List<WriterResult> complete();
-
- void close();
+ private CommitterFactory() {}
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index e64e183089..aed11ab0b1 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -80,7 +80,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
- private static final String CONTROL_GROUP_ID_PROP =
"iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP =
"iceberg.control.commit.interval-ms";
private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
private static final String COMMIT_TIMEOUT_MS_PROP =
"iceberg.control.commit.timeout-ms";
@@ -104,11 +103,7 @@ public class IcebergSinkConfig extends AbstractConfig {
public static final ConfigDef CONFIG_DEF = newConfigDef();
public static String version() {
- String kcVersion =
IcebergSinkConfig.class.getPackage().getImplementationVersion();
- if (kcVersion == null) {
- kcVersion = "unknown";
- }
- return IcebergBuild.version() + "-kc-" + kcVersion;
+ return IcebergBuild.version();
}
private static ConfigDef newConfigDef() {
@@ -185,12 +180,6 @@ public class IcebergSinkConfig extends AbstractConfig {
DEFAULT_CONTROL_TOPIC,
Importance.MEDIUM,
"Name of the control topic");
- configDef.define(
- CONTROL_GROUP_ID_PROP,
- ConfigDef.Type.STRING,
- null,
- Importance.MEDIUM,
- "Name of the consumer group to store offsets");
configDef.define(
CONNECT_GROUP_ID_PROP,
ConfigDef.Type.STRING,
@@ -370,16 +359,6 @@ public class IcebergSinkConfig extends AbstractConfig {
return getString(CONTROL_TOPIC_PROP);
}
- public String controlGroupId() {
- String result = getString(CONTROL_GROUP_ID_PROP);
- if (result != null) {
- return result;
- }
- String connectorName = connectorName();
- Preconditions.checkNotNull(connectorName, "Connector name cannot be null");
- return DEFAULT_CONTROL_GROUP_PREFIX + connectorName;
- }
-
public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
if (result != null) {
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
index 8be8518f44..be1f9a50b8 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java
@@ -44,9 +44,7 @@ public class IcebergSinkConnector extends SinkConnector {
@Override
public Class<? extends Task> taskClass() {
- // FIXME: update this when the connector channel is added
- // return IcebergSinkTask.class;
- return null;
+ return IcebergSinkTask.class;
}
@Override
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
new file mode 100644
index 0000000000..460b18fd7f
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkTask extends SinkTask {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSinkTask.class);
+
+ private IcebergSinkConfig config;
+ private Catalog catalog;
+ private Committer committer;
+
+ @Override
+ public String version() {
+ return IcebergSinkConfig.version();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.config = new IcebergSinkConfig(props);
+ }
+
+ @Override
+ public void open(Collection<TopicPartition> partitions) {
+ Preconditions.checkArgument(catalog == null, "Catalog already open");
+ Preconditions.checkArgument(committer == null, "Committer already open");
+
+ catalog = CatalogUtils.loadCatalog(config);
+ committer = CommitterFactory.createCommitter(config);
+ committer.start(catalog, config, context);
+ }
+
+ @Override
+ public void close(Collection<TopicPartition> partitions) {
+ close();
+ }
+
+ private void close() {
+ if (committer != null) {
+ committer.stop();
+ committer = null;
+ }
+
+ if (catalog != null) {
+ if (catalog instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) catalog).close();
+ } catch (Exception e) {
+ LOG.warn("An error occurred closing catalog instance, ignoring...",
e);
+ }
+ }
+ catalog = null;
+ }
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> sinkRecords) {
+ Preconditions.checkNotNull(committer, "Committer wasn't initialized");
+ committer.save(sinkRecords);
+ }
+
+ @Override
+ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+ Preconditions.checkNotNull(committer, "Committer wasn't initialized");
+ committer.save(null);
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> preCommit(
+ Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+ // offset commit is handled by the worker
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public void stop() {
+ close();
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
new file mode 100644
index 0000000000..993fcf67c9
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class Channel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
+
+ private final String controlTopic;
+ private final String connectGroupId;
+ private final Producer<String, byte[]> producer;
+ private final Consumer<String, byte[]> consumer;
+ private final SinkTaskContext context;
+ private final Admin admin;
+ private final Map<Integer, Long> controlTopicOffsets = Maps.newHashMap();
+ private final String producerId;
+
+ Channel(
+ String name,
+ String consumerGroupId,
+ IcebergSinkConfig config,
+ KafkaClientFactory clientFactory,
+ SinkTaskContext context) {
+ this.controlTopic = config.controlTopic();
+ this.connectGroupId = config.connectGroupId();
+ this.context = context;
+
+ String transactionalId = name + config.transactionalSuffix();
+ this.producer = clientFactory.createProducer(transactionalId);
+ this.consumer = clientFactory.createConsumer(consumerGroupId);
+ this.admin = clientFactory.createAdmin();
+
+ this.producerId = UUID.randomUUID().toString();
+ }
+
+ protected void send(Event event) {
+ send(ImmutableList.of(event), ImmutableMap.of());
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ protected void send(List<Event> events, Map<TopicPartition, Offset>
sourceOffsets) {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Maps.newHashMap();
+ sourceOffsets.forEach((k, v) -> offsetsToCommit.put(k, new
OffsetAndMetadata(v.offset())));
+
+ List<ProducerRecord<String, byte[]>> recordList =
+ events.stream()
+ .map(
+ event -> {
+ LOG.info("Sending event of type: {}", event.type().name());
+ byte[] data = AvroUtil.encode(event);
+ // key by producer ID to keep event order
+ return new ProducerRecord<>(controlTopic, producerId, data);
+ })
+ .collect(Collectors.toList());
+
+ synchronized (producer) {
+ producer.beginTransaction();
+ try {
+ // NOTE: we shouldn't call get() on the future in a transactional
context,
+ // see docs for org.apache.kafka.clients.producer.KafkaProducer
+ recordList.forEach(producer::send);
+ if (!sourceOffsets.isEmpty()) {
+ producer.sendOffsetsToTransaction(
+ offsetsToCommit, KafkaUtils.consumerGroupMetadata(context));
+ }
+ producer.commitTransaction();
+ } catch (Exception e) {
+ try {
+ producer.abortTransaction();
+ } catch (Exception ex) {
+ LOG.warn("Error aborting producer transaction", ex);
+ }
+ throw e;
+ }
+ }
+ }
+
+ protected abstract boolean receive(Envelope envelope);
+
+ protected void consumeAvailable(Duration pollDuration) {
+ ConsumerRecords<String, byte[]> records = consumer.poll(pollDuration);
+ while (!records.isEmpty()) {
+ records.forEach(
+ record -> {
+ // the consumer stores the offsets that corresponds to the next
record to consume,
+ // so increment the record offset by one
+ controlTopicOffsets.put(record.partition(), record.offset() + 1);
+
+ Event event = AvroUtil.decode(record.value());
+
+ if (event.groupId().equals(connectGroupId)) {
+ LOG.debug("Received event of type: {}", event.type().name());
+ if (receive(new Envelope(event, record.partition(),
record.offset()))) {
+ LOG.info("Handled event of type: {}", event.type().name());
+ }
+ }
+ });
+ records = consumer.poll(pollDuration);
+ }
+ }
+
+ protected Map<Integer, Long> controlTopicOffsets() {
+ return controlTopicOffsets;
+ }
+
+ protected void commitConsumerOffsets() {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Maps.newHashMap();
+ controlTopicOffsets()
+ .forEach(
+ (k, v) ->
+ offsetsToCommit.put(new TopicPartition(controlTopic, k), new
OffsetAndMetadata(v)));
+ consumer.commitSync(offsetsToCommit);
+ }
+
+ void start() {
+ consumer.subscribe(ImmutableList.of(controlTopic));
+
+ // initial poll with longer duration so the consumer will initialize...
+ consumeAvailable(Duration.ofSeconds(1));
+ }
+
+ void stop() {
+ LOG.info("Channel stopping");
+ producer.close();
+ consumer.close();
+ admin.close();
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
new file mode 100644
index 0000000000..6cad33c3e3
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.OffsetDateTime;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CommitState {
+ private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);
+
+ private final List<Envelope> commitBuffer = Lists.newArrayList();
+ private final List<DataComplete> readyBuffer = Lists.newArrayList();
+ private long startTime;
+ private UUID currentCommitId;
+ private final IcebergSinkConfig config;
+
+ CommitState(IcebergSinkConfig config) {
+ this.config = config;
+ }
+
+ void addResponse(Envelope envelope) {
+ commitBuffer.add(envelope);
+ if (!isCommitInProgress()) {
+ DataWritten dataWritten = (DataWritten) envelope.event().payload();
+ LOG.warn(
+ "Received commit response when no commit in progress, this can
happen during recovery. Commit ID: {}",
+ dataWritten.commitId());
+ }
+ }
+
+ void addReady(Envelope envelope) {
+ DataComplete dataComplete = (DataComplete) envelope.event().payload();
+ readyBuffer.add(dataComplete);
+ if (!isCommitInProgress()) {
+ LOG.warn(
+ "Received commit ready when no commit in progress, this can happen
during recovery. Commit ID: {}",
+ dataComplete.commitId());
+ }
+ }
+
+ UUID currentCommitId() {
+ return currentCommitId;
+ }
+
+ boolean isCommitInProgress() {
+ return currentCommitId != null;
+ }
+
+ boolean isCommitIntervalReached() {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis();
+ }
+
+ return (!isCommitInProgress()
+ && System.currentTimeMillis() - startTime >=
config.commitIntervalMs());
+ }
+
+ void startNewCommit() {
+ currentCommitId = UUID.randomUUID();
+ startTime = System.currentTimeMillis();
+ }
+
+ void endCurrentCommit() {
+ readyBuffer.clear();
+ currentCommitId = null;
+ }
+
+ void clearResponses() {
+ commitBuffer.clear();
+ }
+
+ boolean isCommitTimedOut() {
+ if (!isCommitInProgress()) {
+ return false;
+ }
+
+ if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
+ LOG.info("Commit timeout reached. Commit ID: {}", currentCommitId);
+ return true;
+ }
+ return false;
+ }
+
+ boolean isCommitReady(int expectedPartitionCount) {
+ if (!isCommitInProgress()) {
+ return false;
+ }
+
+ int receivedPartitionCount =
+ readyBuffer.stream()
+ .filter(payload -> payload.commitId().equals(currentCommitId))
+ .mapToInt(payload -> payload.assignments().size())
+ .sum();
+
+ if (receivedPartitionCount >= expectedPartitionCount) {
+ LOG.info(
+ "Commit {} ready, received responses for all {} partitions",
+ currentCommitId,
+ receivedPartitionCount);
+ return true;
+ }
+
+ LOG.info(
+ "Commit {} not ready, received responses for {} of {} partitions,
waiting for more",
+ currentCommitId,
+ receivedPartitionCount,
+ expectedPartitionCount);
+
+ return false;
+ }
+
+ Map<TableReference, List<Envelope>> tableCommitMap() {
+ return commitBuffer.stream()
+ .collect(
+ Collectors.groupingBy(
+ envelope -> ((DataWritten)
envelope.event().payload()).tableReference()));
+ }
+
+ OffsetDateTime validThroughTs(boolean partialCommit) {
+ boolean hasValidThroughTs =
+ !partialCommit
+ && readyBuffer.stream()
+ .flatMap(event -> event.assignments().stream())
+ .allMatch(offset -> offset.timestamp() != null);
+
+ OffsetDateTime result;
+ if (hasValidThroughTs) {
+ result =
+ readyBuffer.stream()
+ .flatMap(event -> event.assignments().stream())
+ .map(TopicPartitionOffset::timestamp)
+ .min(Comparator.naturalOrder())
+ .orElse(null);
+ } else {
+ result = null;
+ }
+ return result;
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
new file mode 100644
index 0000000000..53b7b76e8e
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.Committer;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.SinkWriter;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitterImpl implements Committer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CommitterImpl.class);
+
+ private CoordinatorThread coordinatorThread;
+ private Worker worker;
+
+ static class TopicPartitionComparator implements Comparator<TopicPartition> {
+
+ @Override
+ public int compare(TopicPartition o1, TopicPartition o2) {
+ int result = o1.topic().compareTo(o2.topic());
+ if (result == 0) {
+ result = Integer.compare(o1.partition(), o2.partition());
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext
context) {
+ KafkaClientFactory clientFactory = new
KafkaClientFactory(config.kafkaProps());
+
+ ConsumerGroupDescription groupDesc;
+ try (Admin admin = clientFactory.createAdmin()) {
+ groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(),
admin);
+ }
+
+ if (groupDesc.state() == ConsumerGroupState.STABLE) {
+ Collection<MemberDescription> members = groupDesc.members();
+ Set<TopicPartition> partitions = context.assignment();
+ if (isLeader(members, partitions)) {
+ LOG.info("Task elected leader, starting commit coordinator");
+ Coordinator coordinator = new Coordinator(catalog, config, members,
clientFactory, context);
+ coordinatorThread = new CoordinatorThread(coordinator);
+ coordinatorThread.start();
+ }
+ }
+
+ LOG.info("Starting commit worker");
+ SinkWriter sinkWriter = new SinkWriter(catalog, config);
+ worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+ }
+
+ @Override
+ public void save(Collection<SinkRecord> sinkRecords) {
+ if (sinkRecords != null && !sinkRecords.isEmpty()) {
+ worker.save(sinkRecords);
+ }
+ processControlEvents();
+ }
+
+ @Override
+ public void stop() {
+ if (worker != null) {
+ worker.stop();
+ worker = null;
+ }
+
+ if (coordinatorThread != null) {
+ coordinatorThread.terminate();
+ coordinatorThread = null;
+ }
+ }
+
+ @VisibleForTesting
+ boolean isLeader(Collection<MemberDescription> members,
Collection<TopicPartition> partitions) {
+ // there should only be one task assigned partition 0 of the first topic,
+ // so elect that one the leader
+ TopicPartition firstTopicPartition =
+ members.stream()
+ .flatMap(member -> member.assignment().topicPartitions().stream())
+ .min(new TopicPartitionComparator())
+ .orElseThrow(
+ () -> new ConnectException("No partitions assigned, cannot
determine leader"));
+
+ return partitions.contains(firstTopicPartition);
+ }
+
+ private void processControlEvents() {
+ if (coordinatorThread != null && coordinatorThread.isTerminated()) {
+ throw new NotRunningException("Coordinator unexpectedly terminated");
+ }
+ if (worker != null) {
+ worker.process();
+ }
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
new file mode 100644
index 0000000000..7274f77e0c
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.CommitComplete;
+import org.apache.iceberg.connect.events.CommitToTable;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class Coordinator extends Channel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final String COMMIT_ID_SNAPSHOT_PROP =
"kafka.connect.commit-id";
+ private static final String VALID_THROUGH_TS_SNAPSHOT_PROP =
"kafka.connect.valid-through-ts";
+ private static final Duration POLL_DURATION = Duration.ofSeconds(1);
+
+ private final Catalog catalog;
+ private final IcebergSinkConfig config;
+ private final int totalPartitionCount;
+ private final String snapshotOffsetsProp;
+ private final ExecutorService exec;
+ private final CommitState commitState;
+
+ Coordinator(
+ Catalog catalog,
+ IcebergSinkConfig config,
+ Collection<MemberDescription> members,
+ KafkaClientFactory clientFactory,
+ SinkTaskContext context) {
+ // pass consumer group ID to which we commit low watermark offsets
+ super("coordinator", config.connectGroupId() + "-coord", config,
clientFactory, context);
+
+ this.catalog = catalog;
+ this.config = config;
+ this.totalPartitionCount =
+ members.stream().mapToInt(desc ->
desc.assignment().topicPartitions().size()).sum();
+ this.snapshotOffsetsProp =
+ String.format(
+ "kafka.connect.offsets.%s.%s", config.controlTopic(),
config.connectGroupId());
+ this.exec = ThreadPools.newWorkerPool("iceberg-committer",
config.commitThreads());
+ this.commitState = new CommitState(config);
+ }
+
+ void process() {
+ if (commitState.isCommitIntervalReached()) {
+ // send out begin commit
+ commitState.startNewCommit();
+ Event event =
+ new Event(config.connectGroupId(), new
StartCommit(commitState.currentCommitId()));
+ send(event);
+ LOG.info("Commit {} initiated", commitState.currentCommitId());
+ }
+
+ consumeAvailable(POLL_DURATION);
+
+ if (commitState.isCommitTimedOut()) {
+ commit(true);
+ }
+ }
+
+ @Override
+ protected boolean receive(Envelope envelope) {
+ switch (envelope.event().payload().type()) {
+ case DATA_WRITTEN:
+ commitState.addResponse(envelope);
+ return true;
+ case DATA_COMPLETE:
+ commitState.addReady(envelope);
+ if (commitState.isCommitReady(totalPartitionCount)) {
+ commit(false);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void commit(boolean partialCommit) {
+ try {
+ doCommit(partialCommit);
+ } catch (Exception e) {
+ LOG.warn("Commit failed, will try again next cycle", e);
+ } finally {
+ commitState.endCurrentCommit();
+ }
+ }
+
+ private void doCommit(boolean partialCommit) {
+ Map<TableReference, List<Envelope>> commitMap =
commitState.tableCommitMap();
+
+ String offsetsJson = offsetsJson();
+ OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit);
+
+ Tasks.foreach(commitMap.entrySet())
+ .executeWith(exec)
+ .stopOnFailure()
+ .run(
+ entry -> {
+ commitToTable(entry.getKey(), entry.getValue(), offsetsJson,
validThroughTs);
+ });
+
+ // we should only get here if all tables committed successfully...
+ commitConsumerOffsets();
+ commitState.clearResponses();
+
+ Event event =
+ new Event(
+ config.connectGroupId(),
+ new CommitComplete(commitState.currentCommitId(), validThroughTs));
+ send(event);
+
+ LOG.info(
+ "Commit {} complete, committed to {} table(s), valid-through {}",
+ commitState.currentCommitId(),
+ commitMap.size(),
+ validThroughTs);
+ }
+
+ private String offsetsJson() {
+ try {
+ return MAPPER.writeValueAsString(controlTopicOffsets());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void commitToTable(
+ TableReference tableReference,
+ List<Envelope> envelopeList,
+ String offsetsJson,
+ OffsetDateTime validThroughTs) {
+ TableIdentifier tableIdentifier = tableReference.identifier();
+ Table table;
+ try {
+ table = catalog.loadTable(tableIdentifier);
+ } catch (NoSuchTableException e) {
+ LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e);
+ return;
+ }
+
+ String branch =
config.tableConfig(tableIdentifier.toString()).commitBranch();
+
+ Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table,
branch);
+
+ List<DataWritten> payloads =
+ envelopeList.stream()
+ .filter(
+ envelope -> {
+ Long minOffset = committedOffsets.get(envelope.partition());
+ return minOffset == null || envelope.offset() >= minOffset;
+ })
+ .map(envelope -> (DataWritten) envelope.event().payload())
+ .collect(Collectors.toList());
+
+ List<DataFile> dataFiles =
+ payloads.stream()
+ .filter(payload -> payload.dataFiles() != null)
+ .flatMap(payload -> payload.dataFiles().stream())
+ .filter(dataFile -> dataFile.recordCount() > 0)
+ .filter(distinctByKey(dataFile -> dataFile.path().toString()))
+ .collect(Collectors.toList());
+
+ List<DeleteFile> deleteFiles =
+ payloads.stream()
+ .filter(payload -> payload.deleteFiles() != null)
+ .flatMap(payload -> payload.deleteFiles().stream())
+ .filter(deleteFile -> deleteFile.recordCount() > 0)
+ .filter(distinctByKey(deleteFile -> deleteFile.path().toString()))
+ .collect(Collectors.toList());
+
+ if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
+ LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
+ } else {
+ if (deleteFiles.isEmpty()) {
+ AppendFiles appendOp = table.newAppend();
+ if (branch != null) {
+ appendOp.toBranch(branch);
+ }
+ appendOp.set(snapshotOffsetsProp, offsetsJson);
+ appendOp.set(COMMIT_ID_SNAPSHOT_PROP,
commitState.currentCommitId().toString());
+ if (validThroughTs != null) {
+ appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP,
validThroughTs.toString());
+ }
+ dataFiles.forEach(appendOp::appendFile);
+ appendOp.commit();
+ } else {
+ RowDelta deltaOp = table.newRowDelta();
+ if (branch != null) {
+ deltaOp.toBranch(branch);
+ }
+ deltaOp.set(snapshotOffsetsProp, offsetsJson);
+ deltaOp.set(COMMIT_ID_SNAPSHOT_PROP,
commitState.currentCommitId().toString());
+ if (validThroughTs != null) {
+ deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP,
validThroughTs.toString());
+ }
+ dataFiles.forEach(deltaOp::addRows);
+ deleteFiles.forEach(deltaOp::addDeletes);
+ deltaOp.commit();
+ }
+
+ Long snapshotId = latestSnapshot(table, branch).snapshotId();
+ Event event =
+ new Event(
+ config.connectGroupId(),
+ new CommitToTable(
+ commitState.currentCommitId(), tableReference, snapshotId,
validThroughTs));
+ send(event);
+
+ LOG.info(
+ "Commit complete to table {}, snapshot {}, commit ID {},
valid-through {}",
+ tableIdentifier,
+ snapshotId,
+ commitState.currentCommitId(),
+ validThroughTs);
+ }
+ }
+
+ private <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
+ Map<Object, Boolean> seen = Maps.newConcurrentMap();
+ return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
+ }
+
+ private Snapshot latestSnapshot(Table table, String branch) {
+ if (branch == null) {
+ return table.currentSnapshot();
+ }
+ return table.snapshot(branch);
+ }
+
+ private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String
branch) {
+ Snapshot snapshot = latestSnapshot(table, branch);
+ while (snapshot != null) {
+ Map<String, String> summary = snapshot.summary();
+ String value = summary.get(snapshotOffsetsProp);
+ if (value != null) {
+ TypeReference<Map<Integer, Long>> typeRef = new
TypeReference<Map<Integer, Long>>() {};
+ try {
+ return MAPPER.readValue(value, typeRef);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ Long parentSnapshotId = snapshot.parentId();
+ snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) :
null;
+ }
+ return ImmutableMap.of();
+ }
+
+ @Override
+ void stop() {
+ exec.shutdownNow();
+
+ // ensure coordinator tasks are shut down, else cause the sink worker to
fail
+ try {
+ if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
+ throw new RuntimeException("Timed out waiting for coordinator
shutdown");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for coordinator
shutdown", e);
+ }
+
+ super.stop();
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
new file mode 100644
index 0000000000..6a31b17fc6
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CoordinatorThread extends Thread {
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorThread.class);
+ private static final String THREAD_NAME = "iceberg-coord";
+
+ private Coordinator coordinator;
+ private volatile boolean terminated;
+
+ CoordinatorThread(Coordinator coordinator) {
+ super(THREAD_NAME);
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public void run() {
+ try {
+ coordinator.start();
+ } catch (Exception e) {
+ LOG.error("Coordinator error during start, exiting thread", e);
+ terminated = true;
+ }
+
+ while (!terminated) {
+ try {
+ coordinator.process();
+ } catch (Exception e) {
+ LOG.error("Coordinator error during process, exiting thread", e);
+ terminated = true;
+ }
+ }
+
+ try {
+ coordinator.stop();
+ } catch (Exception e) {
+ LOG.error("Coordinator error during stop, ignoring", e);
+ }
+ coordinator = null;
+ }
+
+ boolean isTerminated() {
+ return terminated;
+ }
+
+ void terminate() {
+ terminated = true;
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
similarity index 64%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
index 64ca44f032..87a93d0585 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Envelope.java
@@ -16,25 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect.channel;
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.iceberg.connect.events.Event;
-class NoOpWriter implements RecordWriter {
- @Override
- public void write(SinkRecord record) {
- // NO-OP
+class Envelope {
+ private final Event event;
+ private final int partition;
+ private final long offset;
+
+ Envelope(Event event, int partition, long offset) {
+ this.event = event;
+ this.partition = partition;
+ this.offset = offset;
+ }
+
+ Event event() {
+ return event;
}
- @Override
- public List<WriterResult> complete() {
- // NO-OP
- return null;
+ int partition() {
+ return partition;
}
- @Override
- public void close() {
- // NO-OP
+ long offset() {
+ return offset;
}
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
new file mode 100644
index 0000000000..fd5d27ae34
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaClientFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+class KafkaClientFactory {
+ private final Map<String, String> kafkaProps;
+
+ KafkaClientFactory(Map<String, String> kafkaProps) {
+ this.kafkaProps = kafkaProps;
+ }
+
+ Producer<String, byte[]> createProducer(String transactionalId) {
+ Map<String, Object> producerProps = Maps.newHashMap(kafkaProps);
+ producerProps.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString());
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ KafkaProducer<String, byte[]> result =
+ new KafkaProducer<>(producerProps, new StringSerializer(), new
ByteArraySerializer());
+ result.initTransactions();
+ return result;
+ }
+
+ Consumer<String, byte[]> createConsumer(String consumerGroupId) {
+ Map<String, Object> consumerProps = Maps.newHashMap(kafkaProps);
+ consumerProps.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");
+ consumerProps.putIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString());
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
+ return new KafkaConsumer<>(
+ consumerProps, new StringDeserializer(), new ByteArrayDeserializer());
+ }
+
+ Admin createAdmin() {
+ Map<String, Object> adminProps = Maps.newHashMap(kafkaProps);
+ return Admin.create(adminProps);
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
new file mode 100644
index 0000000000..be51fff8bf
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/KafkaUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+class KafkaUtils {
+
+ private static final String CONTEXT_CLASS_NAME =
+ "org.apache.kafka.connect.runtime.WorkerSinkTaskContext";
+
+ static ConsumerGroupDescription consumerGroupDescription(String
consumerGroupId, Admin admin) {
+ try {
+ DescribeConsumerGroupsResult result =
+ admin.describeConsumerGroups(ImmutableList.of(consumerGroupId));
+ return result.describedGroups().get(consumerGroupId).get();
+
+ } catch (InterruptedException | ExecutionException e) {
+ throw new ConnectException(
+ "Cannot retrieve members for consumer group: " + consumerGroupId, e);
+ }
+ }
+
+ static ConsumerGroupMetadata consumerGroupMetadata(SinkTaskContext context) {
+ return kafkaConsumer(context).groupMetadata();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Consumer<byte[], byte[]> kafkaConsumer(SinkTaskContext
context) {
+ String contextClassName = context.getClass().getName();
+ try {
+ return ((Consumer<byte[], byte[]>)
+ DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME,
"consumer").build(context).get());
+ } catch (Exception e) {
+ throw new ConnectException(
+ "Unable to retrieve consumer from context: " + contextClassName, e);
+ }
+ }
+
+ private KafkaUtils() {}
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
similarity index 76%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
index 0b4d7566ea..72a362ceac 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/NotRunningException.java
@@ -16,16 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect.channel;
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
-
-public interface RecordWriter extends Cloneable {
-
- void write(SinkRecord record);
-
- List<WriterResult> complete();
-
- void close();
+public class NotRunningException extends RuntimeException {
+ public NotRunningException(String msg) {
+ super(msg);
+ }
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
new file mode 100644
index 0000000000..7555b216cd
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.data.SinkWriter;
+import org.apache.iceberg.connect.data.SinkWriterResult;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+class Worker extends Channel {
+
+ private final IcebergSinkConfig config;
+ private final SinkTaskContext context;
+ private final SinkWriter sinkWriter;
+
+ Worker(
+ IcebergSinkConfig config,
+ KafkaClientFactory clientFactory,
+ SinkWriter sinkWriter,
+ SinkTaskContext context) {
+ // pass transient consumer group ID to which we never commit offsets
+ super(
+ "worker",
+ IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
+ config,
+ clientFactory,
+ context);
+
+ this.config = config;
+ this.context = context;
+ this.sinkWriter = sinkWriter;
+ }
+
+ void process() {
+ consumeAvailable(Duration.ZERO);
+ }
+
+ @Override
+ protected boolean receive(Envelope envelope) {
+ Event event = envelope.event();
+ if (event.payload().type() != PayloadType.START_COMMIT) {
+ return false;
+ }
+
+ SinkWriterResult results = sinkWriter.completeWrite();
+
+ // include all assigned topic partitions even if no messages were read
+ // from a partition, as the coordinator will use that to determine
+ // when all data for a commit has been received
+ List<TopicPartitionOffset> assignments =
+ context.assignment().stream()
+ .map(
+ tp -> {
+ Offset offset = results.sourceOffsets().get(tp);
+ if (offset == null) {
+ offset = Offset.NULL_OFFSET;
+ }
+ return new TopicPartitionOffset(
+ tp.topic(), tp.partition(), offset.offset(),
offset.timestamp());
+ })
+ .collect(Collectors.toList());
+
+ UUID commitId = ((StartCommit) event.payload()).commitId();
+
+ List<Event> events =
+ results.writerResults().stream()
+ .map(
+ writeResult ->
+ new Event(
+ config.connectGroupId(),
+ new DataWritten(
+ writeResult.partitionStruct(),
+ commitId,
+ TableReference.of(config.catalogName(),
writeResult.tableIdentifier()),
+ writeResult.dataFiles(),
+ writeResult.deleteFiles())))
+ .collect(Collectors.toList());
+
+ Event readyEvent = new Event(config.connectGroupId(), new
DataComplete(commitId, assignments));
+ events.add(readyEvent);
+
+ send(events, results.sourceOffsets());
+
+ return true;
+ }
+
+ @Override
+ void stop() {
+ super.stop();
+ sinkWriter.close();
+ }
+
+ void save(Collection<SinkRecord> sinkRecords) {
+ sinkWriter.save(sinkRecords);
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
index 27ffc4de99..6df6b09151 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
@@ -32,16 +32,16 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
-public class IcebergWriter implements RecordWriter {
+class IcebergWriter implements RecordWriter {
private final Table table;
private final String tableName;
private final IcebergSinkConfig config;
- private final List<WriterResult> writerResults;
+ private final List<IcebergWriterResult> writerResults;
private RecordConverter recordConverter;
private TaskWriter<Record> writer;
- public IcebergWriter(Table table, String tableName, IcebergSinkConfig
config) {
+ IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
this.table = table;
this.tableName = tableName;
this.config = config;
@@ -50,7 +50,7 @@ public class IcebergWriter implements RecordWriter {
}
private void initNewWriter() {
- this.writer = Utilities.createTableWriter(table, tableName, config);
+ this.writer = RecordUtils.createTableWriter(table, tableName, config);
this.recordConverter = new RecordConverter(table, config);
}
@@ -102,7 +102,7 @@ public class IcebergWriter implements RecordWriter {
}
writerResults.add(
- new WriterResult(
+ new IcebergWriterResult(
TableIdentifier.parse(tableName),
Arrays.asList(writeResult.dataFiles()),
Arrays.asList(writeResult.deleteFiles()),
@@ -110,10 +110,10 @@ public class IcebergWriter implements RecordWriter {
}
@Override
- public List<WriterResult> complete() {
+ public List<IcebergWriterResult> complete() {
flush();
- List<WriterResult> result = Lists.newArrayList(writerResults);
+ List<IcebergWriterResult> result = Lists.newArrayList(writerResults);
writerResults.clear();
return result;
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
index 47dcddcb99..92f5af2d7a 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
@@ -40,20 +40,19 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IcebergWriterFactory {
+class IcebergWriterFactory {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergWriterFactory.class);
private final Catalog catalog;
private final IcebergSinkConfig config;
- public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
+ IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
this.catalog = catalog;
this.config = config;
}
- public RecordWriter createWriter(
- String tableName, SinkRecord sample, boolean ignoreMissingTable) {
+ RecordWriter createWriter(String tableName, SinkRecord sample, boolean
ignoreMissingTable) {
TableIdentifier identifier = TableIdentifier.parse(tableName);
Table table;
try {
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
similarity index 96%
rename from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
rename to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
index cb3a700da2..58695a5572 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterResult.java
@@ -24,14 +24,14 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types.StructType;
-public class WriterResult {
+public class IcebergWriterResult {
private final TableIdentifier tableIdentifier;
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
private final StructType partitionStruct;
- public WriterResult(
+ public IcebergWriterResult(
TableIdentifier tableIdentifier,
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles,
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
index 64ca44f032..a7d2c90972 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.connect.data;
import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.kafka.connect.sink.SinkRecord;
class NoOpWriter implements RecordWriter {
@@ -28,9 +29,9 @@ class NoOpWriter implements RecordWriter {
}
@Override
- public List<WriterResult> complete() {
+ public List<IcebergWriterResult> complete() {
// NO-OP
- return null;
+ return ImmutableList.of();
}
@Override
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
similarity index 55%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
index 0b4d7566ea..c4522a4071 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java
@@ -18,14 +18,37 @@
*/
package org.apache.iceberg.connect.data;
-import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import java.time.OffsetDateTime;
+import java.util.Objects;
-public interface RecordWriter extends Cloneable {
+public class Offset implements Comparable<Offset> {
- void write(SinkRecord record);
+ public static final Offset NULL_OFFSET = new Offset(null, null);
- List<WriterResult> complete();
+ private final Long offset;
+ private final OffsetDateTime timestamp;
- void close();
+ public Offset(Long offset, OffsetDateTime timestamp) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ }
+
+ public Long offset() {
+ return offset;
+ }
+
+ public OffsetDateTime timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int compareTo(Offset other) {
+ if (Objects.equals(this.offset, other.offset)) {
+ return 0;
+ }
+ if (this.offset == null || (other.offset != null && other.offset >
this.offset)) {
+ return -1;
+ }
+ return 1;
+ }
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
similarity index 68%
rename from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
rename to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
index 4ff83f7775..5ac9307397 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
@@ -18,25 +18,14 @@
*/
package org.apache.iceberg.connect.data;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.common.DynMethods.BoundMethod;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
@@ -46,7 +35,6 @@ import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -55,71 +43,11 @@ import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class Utilities {
-
- private static final Logger LOG =
LoggerFactory.getLogger(Utilities.class.getName());
- private static final List<String> HADOOP_CONF_FILES =
- ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
-
- public static Catalog loadCatalog(IcebergSinkConfig config) {
- return CatalogUtil.buildIcebergCatalog(
- config.catalogName(), config.catalogProps(), loadHadoopConfig(config));
- }
-
- // use reflection here to avoid requiring Hadoop as a dependency
- private static Object loadHadoopConfig(IcebergSinkConfig config) {
- Class<?> configClass =
- DynClasses.builder()
- .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
- .impl("org.apache.hadoop.conf.Configuration")
- .orNull()
- .build();
-
- if (configClass == null) {
- LOG.info("Hadoop not found on classpath, not creating Hadoop config");
- return null;
- }
-
- try {
- Object result =
DynConstructors.builder().hiddenImpl(configClass).build().newInstance();
- BoundMethod addResourceMethod =
- DynMethods.builder("addResource").impl(configClass,
URL.class).build(result);
- BoundMethod setMethod =
- DynMethods.builder("set").impl(configClass, String.class,
String.class).build(result);
-
- // load any config files in the specified config directory
- String hadoopConfDir = config.hadoopConfDir();
- if (hadoopConfDir != null) {
- HADOOP_CONF_FILES.forEach(
- confFile -> {
- Path path = Paths.get(hadoopConfDir, confFile);
- if (Files.exists(path)) {
- try {
- addResourceMethod.invoke(path.toUri().toURL());
- } catch (IOException e) {
- LOG.warn("Error adding Hadoop resource {}, resource was not
added", path, e);
- }
- }
- });
- }
-
- // set any Hadoop properties specified in the sink config
- config.hadoopProps().forEach(setMethod::invoke);
-
- LOG.info("Hadoop config initialized: {}", configClass.getName());
- return result;
- } catch (Exception e) {
- LOG.warn(
- "Hadoop found on classpath but could not create config, proceeding
without config", e);
- }
- return null;
- }
+class RecordUtils {
@SuppressWarnings("unchecked")
- public static Object extractFromRecordValue(Object recordValue, String
fieldName) {
+ static Object extractFromRecordValue(Object recordValue, String fieldName) {
List<String> fields = Splitter.on('.').splitToList(fieldName);
if (recordValue instanceof Struct) {
return valueFromStruct((Struct) recordValue, fields);
@@ -243,5 +171,5 @@ public class Utilities {
return writer;
}
- private Utilities() {}
+ private RecordUtils() {}
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
index 0b4d7566ea..56438dde2e 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java
@@ -21,11 +21,11 @@ package org.apache.iceberg.connect.data;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
-public interface RecordWriter extends Cloneable {
+interface RecordWriter extends Cloneable {
void write(SinkRecord record);
- List<WriterResult> complete();
+ List<IcebergWriterResult> complete();
void close();
}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
new file mode 100644
index 0000000000..35a2957f01
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+ private final IcebergSinkConfig config;
+ private final IcebergWriterFactory writerFactory;
+ private final Map<String, RecordWriter> writers;
+ private final Map<TopicPartition, Offset> sourceOffsets;
+
+ public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+ this.config = config;
+ this.writerFactory = new IcebergWriterFactory(catalog, config);
+ this.writers = Maps.newHashMap();
+ this.sourceOffsets = Maps.newHashMap();
+ }
+
+ public void close() {
+ writers.values().forEach(RecordWriter::close);
+ }
+
+ public SinkWriterResult completeWrite() {
+ List<IcebergWriterResult> writerResults =
+ writers.values().stream()
+ .flatMap(writer -> writer.complete().stream())
+ .collect(Collectors.toList());
+ Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+ writers.clear();
+ sourceOffsets.clear();
+
+ return new SinkWriterResult(writerResults, offsets);
+ }
+
+ public void save(Collection<SinkRecord> sinkRecords) {
+ sinkRecords.forEach(this::save);
+ }
+
+ private void save(SinkRecord record) {
+ // the consumer stores the offsets that corresponds to the next record to
consume,
+ // so increment the record offset by one
+ OffsetDateTime timestamp =
+ record.timestamp() == null
+ ? null
+ :
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()),
ZoneOffset.UTC);
+ sourceOffsets.put(
+ new TopicPartition(record.topic(), record.kafkaPartition()),
+ new Offset(record.kafkaOffset() + 1, timestamp));
+
+ if (config.dynamicTablesEnabled()) {
+ routeRecordDynamically(record);
+ } else {
+ routeRecordStatically(record);
+ }
+ }
+
+ private void routeRecordStatically(SinkRecord record) {
+ String routeField = config.tablesRouteField();
+
+ if (routeField == null) {
+ // route to all tables
+ config
+ .tables()
+ .forEach(
+ tableName -> {
+ writerForTable(tableName, record, false).write(record);
+ });
+
+ } else {
+ String routeValue = extractRouteValue(record.value(), routeField);
+ if (routeValue != null) {
+ config
+ .tables()
+ .forEach(
+ tableName -> {
+ Pattern regex = config.tableConfig(tableName).routeRegex();
+ if (regex != null && regex.matcher(routeValue).matches()) {
+ writerForTable(tableName, record, false).write(record);
+ }
+ });
+ }
+ }
+ }
+
+ private void routeRecordDynamically(SinkRecord record) {
+ String routeField = config.tablesRouteField();
+ Preconditions.checkNotNull(routeField, "Route field cannot be null with
dynamic routing");
+
+ String routeValue = extractRouteValue(record.value(), routeField);
+ if (routeValue != null) {
+ String tableName = routeValue.toLowerCase();
+ writerForTable(tableName, record, true).write(record);
+ }
+ }
+
+ private String extractRouteValue(Object recordValue, String routeField) {
+ if (recordValue == null) {
+ return null;
+ }
+ Object routeValue = RecordUtils.extractFromRecordValue(recordValue,
routeField);
+ return routeValue == null ? null : routeValue.toString();
+ }
+
+ private RecordWriter writerForTable(
+ String tableName, SinkRecord sample, boolean ignoreMissingTable) {
+ return writers.computeIfAbsent(
+ tableName, notUsed -> writerFactory.createWriter(tableName, sample,
ignoreMissingTable));
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
similarity index 60%
copy from
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
copy to
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
index 64ca44f032..ef899102bb 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriterResult.java
@@ -19,22 +19,24 @@
package org.apache.iceberg.connect.data;
import java.util.List;
-import org.apache.kafka.connect.sink.SinkRecord;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
-class NoOpWriter implements RecordWriter {
- @Override
- public void write(SinkRecord record) {
- // NO-OP
+public class SinkWriterResult {
+ private final List<IcebergWriterResult> writerResults;
+ private final Map<TopicPartition, Offset> sourceOffsets;
+
+ public SinkWriterResult(
+ List<IcebergWriterResult> writerResults, Map<TopicPartition, Offset>
sourceOffsets) {
+ this.writerResults = writerResults;
+ this.sourceOffsets = sourceOffsets;
}
- @Override
- public List<WriterResult> complete() {
- // NO-OP
- return null;
+ public List<IcebergWriterResult> writerResults() {
+ return writerResults;
}
- @Override
- public void close() {
- // NO-OP
+ public Map<TopicPartition, Offset> sourceOffsets() {
+ return sourceOffsets;
}
}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
similarity index 60%
rename from
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
rename to
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
index cfa1709da7..ce92b3efc3 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/CatalogUtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.data;
+package org.apache.iceberg.connect;
import static org.assertj.core.api.Assertions.assertThat;
@@ -27,19 +27,15 @@ import java.nio.file.Path;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-public class UtilitiesTest {
+public class CatalogUtilsTest {
private static final String HADOOP_CONF_TEMPLATE =
"<configuration><property><name>%s</name><value>%s</value></property></configuration>";
@@ -68,7 +64,7 @@ public class UtilitiesTest {
"iceberg.catalog.catalog-impl",
TestCatalog.class.getName());
IcebergSinkConfig config = new IcebergSinkConfig(props);
- Catalog result = Utilities.loadCatalog(config);
+ Catalog result = CatalogUtils.loadCatalog(config);
assertThat(result).isInstanceOf(TestCatalog.class);
@@ -102,7 +98,7 @@ public class UtilitiesTest {
"iceberg.catalog.catalog-impl",
TestCatalog.class.getName());
IcebergSinkConfig config = new IcebergSinkConfig(props);
- Catalog result = Utilities.loadCatalog(config);
+ Catalog result = CatalogUtils.loadCatalog(config);
assertThat(result).isInstanceOf(TestCatalog.class);
@@ -118,66 +114,4 @@ public class UtilitiesTest {
// check that core-site.xml was loaded
assertThat(conf.get("foo")).isEqualTo("bar");
}
-
- @Test
- public void testExtractFromRecordValueStruct() {
- Schema valSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
- Struct val = new Struct(valSchema).put("key", 123L);
- Object result = Utilities.extractFromRecordValue(val, "key");
- assertThat(result).isEqualTo(123L);
- }
-
- @Test
- public void testExtractFromRecordValueStructNested() {
- Schema idSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
- Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build();
- Schema valSchema = SchemaBuilder.struct().field("data",
dataSchema).build();
-
- Struct id = new Struct(idSchema).put("key", 123L);
- Struct data = new Struct(dataSchema).put("id", id);
- Struct val = new Struct(valSchema).put("data", data);
-
- Object result = Utilities.extractFromRecordValue(val, "data.id.key");
- assertThat(result).isEqualTo(123L);
- }
-
- @Test
- public void testExtractFromRecordValueStructNull() {
- Schema valSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
- Struct val = new Struct(valSchema).put("key", 123L);
-
- Object result = Utilities.extractFromRecordValue(val, "");
- assertThat(result).isNull();
-
- result = Utilities.extractFromRecordValue(val, "xkey");
- assertThat(result).isNull();
- }
-
- @Test
- public void testExtractFromRecordValueMap() {
- Map<String, Object> val = ImmutableMap.of("key", 123L);
- Object result = Utilities.extractFromRecordValue(val, "key");
- assertThat(result).isEqualTo(123L);
- }
-
- @Test
- public void testExtractFromRecordValueMapNested() {
- Map<String, Object> id = ImmutableMap.of("key", 123L);
- Map<String, Object> data = ImmutableMap.of("id", id);
- Map<String, Object> val = ImmutableMap.of("data", data);
-
- Object result = Utilities.extractFromRecordValue(val, "data.id.key");
- assertThat(result).isEqualTo(123L);
- }
-
- @Test
- public void testExtractFromRecordValueMapNull() {
- Map<String, Object> val = ImmutableMap.of("key", 123L);
-
- Object result = Utilities.extractFromRecordValue(val, "");
- assertThat(result).isNull();
-
- result = Utilities.extractFromRecordValue(val, "xkey");
- assertThat(result).isNull();
- }
}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
new file mode 100644
index 0000000000..e6ffefbd97
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+public class ChannelTestBase {
+ protected static final String SRC_TOPIC_NAME = "src-topic";
+ protected static final String CTL_TOPIC_NAME = "ctl-topic";
+ protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect";
+ protected InMemoryCatalog catalog;
+ protected Table table;
+ protected IcebergSinkConfig config;
+ protected KafkaClientFactory clientFactory;
+ protected MockProducer<String, byte[]> producer;
+ protected MockConsumer<String, byte[]> consumer;
+ protected Admin admin;
+
+ private InMemoryCatalog initInMemoryCatalog() {
+ InMemoryCatalog inMemoryCatalog = new InMemoryCatalog();
+ inMemoryCatalog.initialize(null, ImmutableMap.of());
+ return inMemoryCatalog;
+ }
+
+ protected static final Namespace NAMESPACE = Namespace.of("db");
+ protected static final String TABLE_NAME = "tbl";
+ protected static final TableIdentifier TABLE_IDENTIFIER =
+ TableIdentifier.of(NAMESPACE, TABLE_NAME);
+ protected static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()),
+ required(3, "date", Types.StringType.get()));
+
+ protected static final String COMMIT_ID_SNAPSHOT_PROP =
"kafka.connect.commit-id";
+ protected static final String OFFSETS_SNAPSHOT_PROP =
+ String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME,
CONNECT_CONSUMER_GROUP_ID);
+ protected static final String VALID_THROUGH_TS_SNAPSHOT_PROP =
"kafka.connect.valid-through-ts";
+
+ @BeforeEach
+ @SuppressWarnings("deprecation")
+ public void before() {
+ catalog = initInMemoryCatalog();
+ catalog.createNamespace(NAMESPACE);
+ table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+
+ config = mock(IcebergSinkConfig.class);
+ when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME);
+ when(config.commitThreads()).thenReturn(1);
+ when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+
+ TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
+ when(partitionInfo.partition()).thenReturn(0);
+ TopicDescription topicDesc =
+ new TopicDescription(SRC_TOPIC_NAME, false,
ImmutableList.of(partitionInfo));
+ DescribeTopicsResult describeResult = mock(DescribeTopicsResult.class);
+ when(describeResult.values())
+ .thenReturn(ImmutableMap.of(SRC_TOPIC_NAME,
KafkaFuture.completedFuture(topicDesc)));
+
+ admin = mock(Admin.class);
+ when(admin.describeTopics(anyCollection())).thenReturn(describeResult);
+
+ producer = new MockProducer<>(false, new StringSerializer(), new
ByteArraySerializer());
+ producer.initTransactions();
+
+ consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+
+ clientFactory = mock(KafkaClientFactory.class);
+ when(clientFactory.createProducer(any())).thenReturn(producer);
+ when(clientFactory.createConsumer(any())).thenReturn(consumer);
+ when(clientFactory.createAdmin()).thenReturn(admin);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ catalog.close();
+ }
+
+ protected void initConsumer() {
+ TopicPartition tp = new TopicPartition(CTL_TOPIC_NAME, 0);
+ consumer.rebalance(ImmutableList.of(tp));
+ consumer.updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
new file mode 100644
index 0000000000..a9fe1ad099
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitStateTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.Payload;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Test;
+
+public class CommitStateTest {
+ @Test
+ public void testIsCommitReady() {
+ TopicPartitionOffset tp = mock(TopicPartitionOffset.class);
+
+ CommitState commitState = new CommitState(mock(IcebergSinkConfig.class));
+ commitState.startNewCommit();
+
+ DataComplete payload1 = mock(DataComplete.class);
+ when(payload1.commitId()).thenReturn(commitState.currentCommitId());
+ when(payload1.assignments()).thenReturn(ImmutableList.of(tp, tp));
+
+ DataComplete payload2 = mock(DataComplete.class);
+ when(payload2.commitId()).thenReturn(commitState.currentCommitId());
+ when(payload2.assignments()).thenReturn(ImmutableList.of(tp));
+
+ DataComplete payload3 = mock(DataComplete.class);
+ when(payload3.commitId()).thenReturn(UUID.randomUUID());
+ when(payload3.assignments()).thenReturn(ImmutableList.of(tp));
+
+ commitState.addReady(wrapInEnvelope(payload1));
+ commitState.addReady(wrapInEnvelope(payload2));
+ commitState.addReady(wrapInEnvelope(payload3));
+
+ assertThat(commitState.isCommitReady(3)).isTrue();
+ assertThat(commitState.isCommitReady(4)).isFalse();
+ }
+
+ @Test
+ public void testGetValidThroughTs() {
+ DataComplete payload1 = mock(DataComplete.class);
+ TopicPartitionOffset tp1 = mock(TopicPartitionOffset.class);
+ OffsetDateTime ts1 = EventTestUtil.now();
+ when(tp1.timestamp()).thenReturn(ts1);
+
+ TopicPartitionOffset tp2 = mock(TopicPartitionOffset.class);
+ OffsetDateTime ts2 = ts1.plusSeconds(1);
+ when(tp2.timestamp()).thenReturn(ts2);
+ when(payload1.assignments()).thenReturn(ImmutableList.of(tp1, tp2));
+
+ DataComplete payload2 = mock(DataComplete.class);
+ TopicPartitionOffset tp3 = mock(TopicPartitionOffset.class);
+ OffsetDateTime ts3 = ts1.plusSeconds(2);
+ when(tp3.timestamp()).thenReturn(ts3);
+ when(payload2.assignments()).thenReturn(ImmutableList.of(tp3));
+
+ CommitState commitState = new CommitState(mock(IcebergSinkConfig.class));
+ commitState.startNewCommit();
+
+ commitState.addReady(wrapInEnvelope(payload1));
+ commitState.addReady(wrapInEnvelope(payload2));
+
+ assertThat(commitState.validThroughTs(false)).isEqualTo(ts1);
+ assertThat(commitState.validThroughTs(true)).isNull();
+
+ // null timestamp for one, so should not set a valid-through timestamp
+ DataComplete payload3 = mock(DataComplete.class);
+ TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class);
+ when(tp4.timestamp()).thenReturn(null);
+ when(payload3.assignments()).thenReturn(ImmutableList.of(tp4));
+
+ commitState.addReady(wrapInEnvelope(payload3));
+
+ assertThat(commitState.validThroughTs(false)).isNull();
+ assertThat(commitState.validThroughTs(true)).isNull();
+ }
+
+ private Envelope wrapInEnvelope(Payload payload) {
+ Event event = mock(Event.class);
+ when(event.payload()).thenReturn(payload);
+ return new Envelope(event, 0, 0);
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
new file mode 100644
index 0000000000..7c8ccf8ef6
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CommitterImplTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+public class CommitterImplTest {
+
+ @Test
+ public void testIsLeader() {
+ CommitterImpl committer = new CommitterImpl();
+
+ MemberAssignment assignment1 =
+ new MemberAssignment(
+ ImmutableSet.of(new TopicPartition("topic1", 0), new
TopicPartition("topic2", 1)));
+ MemberDescription member1 =
+ new MemberDescription(null, Optional.empty(), null, null, assignment1);
+
+ MemberAssignment assignment2 =
+ new MemberAssignment(
+ ImmutableSet.of(new TopicPartition("topic2", 0), new
TopicPartition("topic1", 1)));
+ MemberDescription member2 =
+ new MemberDescription(null, Optional.empty(), null, null, assignment2);
+
+ List<MemberDescription> members = ImmutableList.of(member1, member2);
+
+ List<TopicPartition> assignments =
+ ImmutableList.of(new TopicPartition("topic2", 1), new
TopicPartition("topic1", 0));
+ assertThat(committer.isLeader(members, assignments)).isTrue();
+
+ assignments =
+ ImmutableList.of(new TopicPartition("topic2", 0), new
TopicPartition("topic1", 1));
+ assertThat(committer.isLeader(members, assignments)).isFalse();
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
new file mode 100644
index 0000000000..9c0b8122ae
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.CommitComplete;
+import org.apache.iceberg.connect.events.CommitToTable;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class CoordinatorTest extends ChannelTestBase {
+
+ @Test
+ public void testCommitAppend() {
+ Assertions.assertEquals(0,
ImmutableList.copyOf(table.snapshots().iterator()).size());
+
+ OffsetDateTime ts = EventTestUtil.now();
+ UUID commitId =
+ coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()),
ImmutableList.of(), ts);
+ table.refresh();
+
+ assertThat(producer.history()).hasSize(3);
+ assertCommitTable(1, commitId, ts);
+ assertCommitComplete(2, commitId, ts);
+
+ List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+ Assertions.assertEquals(1, snapshots.size());
+
+ Snapshot snapshot = snapshots.get(0);
+ Assertions.assertEquals(DataOperations.APPEND, snapshot.operation());
+ Assertions.assertEquals(1,
ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size());
+ Assertions.assertEquals(0,
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
+
+ Map<String, String> summary = snapshot.summary();
+ Assertions.assertEquals(commitId.toString(),
summary.get(COMMIT_ID_SNAPSHOT_PROP));
+ Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP));
+ Assertions.assertEquals(ts.toString(),
summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP));
+ }
+
+ @Test
+ public void testCommitDelta() {
+ OffsetDateTime ts = EventTestUtil.now();
+ UUID commitId =
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(EventTestUtil.createDeleteFile()),
+ ts);
+
+ assertThat(producer.history()).hasSize(3);
+ assertCommitTable(1, commitId, ts);
+ assertCommitComplete(2, commitId, ts);
+
+ List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+ Assertions.assertEquals(1, snapshots.size());
+
+ Snapshot snapshot = snapshots.get(0);
+ Assertions.assertEquals(DataOperations.OVERWRITE, snapshot.operation());
+ Assertions.assertEquals(1,
ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size());
+ Assertions.assertEquals(1,
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
+
+ Map<String, String> summary = snapshot.summary();
+ Assertions.assertEquals(commitId.toString(),
summary.get(COMMIT_ID_SNAPSHOT_PROP));
+ Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP));
+ Assertions.assertEquals(ts.toString(),
summary.get(VALID_THROUGH_TS_SNAPSHOT_PROP));
+ }
+
+ @Test
+ public void testCommitNoFiles() {
+ OffsetDateTime ts = EventTestUtil.now();
+ UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(),
ts);
+
+ assertThat(producer.history()).hasSize(2);
+ assertCommitComplete(1, commitId, ts);
+
+ List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+ Assertions.assertEquals(0, snapshots.size());
+ }
+
+ @Test
+ public void testCommitError() {
+ // this spec isn't registered with the table
+ PartitionSpec badPartitionSpec =
+ PartitionSpec.builderFor(SCHEMA).withSpecId(1).identity("id").build();
+ DataFile badDataFile =
+ DataFiles.builder(badPartitionSpec)
+ .withPath(UUID.randomUUID() + ".parquet")
+ .withFormat(FileFormat.PARQUET)
+ .withFileSizeInBytes(100L)
+ .withRecordCount(5)
+ .build();
+
+ coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+
+ // no commit messages sent
+ assertThat(producer.history()).hasSize(1);
+
+ List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
+ Assertions.assertEquals(0, snapshots.size());
+ }
+
+ private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
+ byte[] bytes = producer.history().get(idx).value();
+ Event commitTable = AvroUtil.decode(bytes);
+ assertThat(commitTable.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE);
+ CommitToTable commitToTablePayload = (CommitToTable) commitTable.payload();
+ assertThat(commitToTablePayload.commitId()).isEqualTo(commitId);
+ assertThat(commitToTablePayload.tableReference().identifier().toString())
+ .isEqualTo(TABLE_IDENTIFIER.toString());
+ assertThat(commitToTablePayload.validThroughTs()).isEqualTo(ts);
+ }
+
+ private void assertCommitComplete(int idx, UUID commitId, OffsetDateTime ts)
{
+ byte[] bytes = producer.history().get(idx).value();
+ Event commitComplete = AvroUtil.decode(bytes);
+ assertThat(commitComplete.type()).isEqualTo(PayloadType.COMMIT_COMPLETE);
+ CommitComplete commitCompletePayload = (CommitComplete)
commitComplete.payload();
+ assertThat(commitCompletePayload.commitId()).isEqualTo(commitId);
+ assertThat(commitCompletePayload.validThroughTs()).isEqualTo(ts);
+ }
+
+ private UUID coordinatorTest(
+ List<DataFile> dataFiles, List<DeleteFile> deleteFiles, OffsetDateTime
ts) {
+ when(config.commitIntervalMs()).thenReturn(0);
+ when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ Coordinator coordinator =
+ new Coordinator(catalog, config, ImmutableList.of(), clientFactory,
context);
+ coordinator.start();
+
+ // init consumer after subscribe()
+ initConsumer();
+
+ coordinator.process();
+
+ assertThat(producer.transactionCommitted()).isTrue();
+ assertThat(producer.history()).hasSize(1);
+
+ byte[] bytes = producer.history().get(0).value();
+ Event commitRequest = AvroUtil.decode(bytes);
+ assertThat(commitRequest.type()).isEqualTo(PayloadType.START_COMMIT);
+
+ UUID commitId = ((StartCommit) commitRequest.payload()).commitId();
+
+ Event commitResponse =
+ new Event(
+ config.connectGroupId(),
+ new DataWritten(
+ StructType.of(),
+ commitId,
+ new TableReference("catalog", ImmutableList.of("db"), "tbl"),
+ dataFiles,
+ deleteFiles));
+ bytes = AvroUtil.encode(commitResponse);
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
bytes));
+
+ Event commitReady =
+ new Event(
+ config.connectGroupId(),
+ new DataComplete(
+ commitId, ImmutableList.of(new TopicPartitionOffset("topic",
1, 1L, ts))));
+ bytes = AvroUtil.encode(commitReady);
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
bytes));
+
+ when(config.commitIntervalMs()).thenReturn(0);
+
+ coordinator.process();
+
+ return commitId;
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
new file mode 100644
index 0000000000..da0d881f89
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/CoordinatorThreadTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.jupiter.api.Test;
+
+public class CoordinatorThreadTest {
+
+ @Test
+ public void testRun() {
+ Coordinator coordinator = mock(Coordinator.class);
+ CoordinatorThread coordinatorThread = new CoordinatorThread(coordinator);
+
+ coordinatorThread.start();
+
+ verify(coordinator, timeout(1000)).start();
+ verify(coordinator, timeout(1000).atLeast(1)).process();
+ verify(coordinator, times(0)).stop();
+ assertThat(coordinatorThread.isTerminated()).isFalse();
+
+ coordinatorThread.terminate();
+
+ verify(coordinator, timeout(1000)).stop();
+ assertThat(coordinatorThread.isTerminated()).isTrue();
+ }
+}
diff --git
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
similarity index 95%
copy from
kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
copy to
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
index 8f1f7a601f..8c3625b74a 100644
---
a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/EventTestUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.connect.events;
+package org.apache.iceberg.connect.channel;
import java.nio.ByteBuffer;
import java.time.OffsetDateTime;
@@ -44,8 +44,7 @@ class EventTestUtil {
static final Schema SCHEMA =
new Schema(ImmutableList.of(Types.NestedField.required(1, "id",
Types.LongType.get())));
- static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build();
+ static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
static final SortOrder ORDER =
SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC,
NullOrder.NULLS_FIRST).build();
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
new file mode 100644
index 0000000000..577c28fe63
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/WorkerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.channel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.data.IcebergWriterResult;
+import org.apache.iceberg.connect.data.Offset;
+import org.apache.iceberg.connect.data.SinkWriter;
+import org.apache.iceberg.connect.data.SinkWriterResult;
+import org.apache.iceberg.connect.events.AvroUtil;
+import org.apache.iceberg.connect.events.DataComplete;
+import org.apache.iceberg.connect.events.DataWritten;
+import org.apache.iceberg.connect.events.Event;
+import org.apache.iceberg.connect.events.PayloadType;
+import org.apache.iceberg.connect.events.StartCommit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class WorkerTest extends ChannelTestBase {
+
+ @Test
+ public void testSave() {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ IcebergWriterResult writeResult =
+ new IcebergWriterResult(
+ TableIdentifier.parse(TABLE_NAME),
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ StructType.of());
+
+ Map<TopicPartition, Offset> offsets =
+ ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now()));
+
+ SinkWriterResult sinkWriterResult =
+ new SinkWriterResult(ImmutableList.of(writeResult), offsets);
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite()).thenReturn(sinkWriterResult);
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // init consumer after subscribe()
+ initConsumer();
+
+ // save a record
+ Map<String, Object> value = ImmutableMap.of();
+ SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null,
value, 0L);
+ worker.save(ImmutableList.of(rec));
+
+ UUID commitId = UUID.randomUUID();
+ Event commitRequest = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ byte[] bytes = AvroUtil.encode(commitRequest);
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
bytes));
+
+ worker.process();
+
+ assertThat(producer.history()).hasSize(2);
+
+ Event event = AvroUtil.decode(producer.history().get(0).value());
+ assertThat(event.payload().type()).isEqualTo(PayloadType.DATA_WRITTEN);
+ DataWritten dataWritten = (DataWritten) event.payload();
+ assertThat(dataWritten.commitId()).isEqualTo(commitId);
+
+ event = AvroUtil.decode(producer.history().get(1).value());
+ assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE);
+ DataComplete dataComplete = (DataComplete) event.payload();
+ assertThat(dataComplete.commitId()).isEqualTo(commitId);
+ assertThat(dataComplete.assignments()).hasSize(1);
+ assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+ }
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
index 80adc7fc3e..ac44952a5c 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java
@@ -73,7 +73,7 @@ public class BaseWriterTest {
protected WriteResult writeTest(
List<Record> rows, IcebergSinkConfig config, Class<?>
expectedWriterClass) {
- try (TaskWriter<Record> writer = Utilities.createTableWriter(table,
"name", config)) {
+ try (TaskWriter<Record> writer = RecordUtils.createTableWriter(table,
"name", config)) {
assertThat(writer.getClass()).isEqualTo(expectedWriterClass);
rows.forEach(
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
new file mode 100644
index 0000000000..08e832256a
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+public class RecordUtilsTest {
+
+ @Test
+ public void testExtractFromRecordValueStruct() {
+ Schema valSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
+ Struct val = new Struct(valSchema).put("key", 123L);
+ Object result = RecordUtils.extractFromRecordValue(val, "key");
+ assertThat(result).isEqualTo(123L);
+ }
+
+ @Test
+ public void testExtractFromRecordValueStructNested() {
+ Schema idSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
+ Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build();
+ Schema valSchema = SchemaBuilder.struct().field("data",
dataSchema).build();
+
+ Struct id = new Struct(idSchema).put("key", 123L);
+ Struct data = new Struct(dataSchema).put("id", id);
+ Struct val = new Struct(valSchema).put("data", data);
+
+ Object result = RecordUtils.extractFromRecordValue(val, "data.id.key");
+ assertThat(result).isEqualTo(123L);
+ }
+
+ @Test
+ public void testExtractFromRecordValueStructNull() {
+ Schema valSchema = SchemaBuilder.struct().field("key",
Schema.INT64_SCHEMA).build();
+ Struct val = new Struct(valSchema).put("key", 123L);
+
+ Object result = RecordUtils.extractFromRecordValue(val, "");
+ assertThat(result).isNull();
+
+ result = RecordUtils.extractFromRecordValue(val, "xkey");
+ assertThat(result).isNull();
+ }
+
+ @Test
+ public void testExtractFromRecordValueMap() {
+ Map<String, Object> val = ImmutableMap.of("key", 123L);
+ Object result = RecordUtils.extractFromRecordValue(val, "key");
+ assertThat(result).isEqualTo(123L);
+ }
+
+ @Test
+ public void testExtractFromRecordValueMapNested() {
+ Map<String, Object> id = ImmutableMap.of("key", 123L);
+ Map<String, Object> data = ImmutableMap.of("id", id);
+ Map<String, Object> val = ImmutableMap.of("data", data);
+
+ Object result = RecordUtils.extractFromRecordValue(val, "data.id.key");
+ assertThat(result).isEqualTo(123L);
+ }
+
+ @Test
+ public void testExtractFromRecordValueMapNull() {
+ Map<String, Object> val = ImmutableMap.of("key", 123L);
+
+ Object result = RecordUtils.extractFromRecordValue(val, "");
+ assertThat(result).isNull();
+
+ result = RecordUtils.extractFromRecordValue(val, "xkey");
+ assertThat(result).isNull();
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
new file mode 100644
index 0000000000..be29ef1022
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUpdateTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class SchemaUpdateTest {
+
+ @Test
+ public void testAddColumn() {
+ SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+ updateConsumer.addColumn("parent", "name", Types.StringType.get());
+ assertThat(updateConsumer.addColumns()).hasSize(1);
+ assertThat(updateConsumer.updateTypes()).isEmpty();
+ assertThat(updateConsumer.makeOptionals()).isEmpty();
+
+ SchemaUpdate.AddColumn addColumn =
updateConsumer.addColumns().iterator().next();
+ assertThat(addColumn.parentName()).isEqualTo("parent");
+ assertThat(addColumn.name()).isEqualTo("name");
+ assertThat(addColumn.type()).isEqualTo(Types.StringType.get());
+ }
+
+ @Test
+ public void testUpdateType() {
+ SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+ updateConsumer.updateType("name", Types.LongType.get());
+ assertThat(updateConsumer.addColumns()).isEmpty();
+
+ assertThat(updateConsumer.updateTypes()).hasSize(1);
+ assertThat(updateConsumer.makeOptionals()).isEmpty();
+
+ SchemaUpdate.UpdateType updateType =
updateConsumer.updateTypes().iterator().next();
+ assertThat(updateType.name()).isEqualTo("name");
+ assertThat(updateType.type()).isEqualTo(Types.LongType.get());
+ }
+
+ @Test
+ public void testMakeOptional() {
+ SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
+ updateConsumer.makeOptional("name");
+ assertThat(updateConsumer.addColumns()).isEmpty();
+
+ assertThat(updateConsumer.updateTypes()).isEmpty();
+ assertThat(updateConsumer.makeOptionals()).hasSize(1);
+
+ SchemaUpdate.MakeOptional makeOptional =
updateConsumer.makeOptionals().iterator().next();
+ assertThat(makeOptional.name()).isEqualTo("name");
+ }
+}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
new file mode 100644
index 0000000000..4a17b926fc
--- /dev/null
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SinkWriterTest {
+
+ private InMemoryCatalog catalog;
+
+ private static final Namespace NAMESPACE = Namespace.of("db");
+ private static final String TABLE_NAME = "tbl";
+ private static final TableIdentifier TABLE_IDENTIFIER =
TableIdentifier.of(NAMESPACE, TABLE_NAME);
+ private static final Schema SCHEMA =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()),
+ optional(3, "date", Types.StringType.get()));
+ private static final String ROUTE_FIELD = "fld";
+
+ @BeforeEach
+ public void before() {
+ catalog = initInMemoryCatalog();
+ catalog.createNamespace(NAMESPACE);
+ catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ catalog.close();
+ }
+
+ private InMemoryCatalog initInMemoryCatalog() {
+ InMemoryCatalog inMemoryCatalog = new InMemoryCatalog();
+ inMemoryCatalog.initialize(null, ImmutableMap.of());
+ return inMemoryCatalog;
+ }
+
+ @Test
+ public void testDefaultRoute() {
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ Map<String, Object> value = ImmutableMap.of();
+
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(1);
+ IcebergWriterResult writerResult = writerResults.get(0);
+ assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testDefaultNoRoute() {
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.tables()).thenReturn(ImmutableList.of());
+ Map<String, Object> value = ImmutableMap.of();
+
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testStaticRoute() {
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+ when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+ Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "val");
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(1);
+ IcebergWriterResult writerResult = writerResults.get(0);
+ assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testStaticNoRoute() {
+ TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+ when(tableConfig.routeRegex()).thenReturn(Pattern.compile("val"));
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(tableConfig);
+ when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+ Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "foobar");
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testDynamicRoute() {
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.dynamicTablesEnabled()).thenReturn(true);
+ when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+ Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD,
TABLE_IDENTIFIER.toString());
+
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(1);
+ IcebergWriterResult writerResult = writerResults.get(0);
+ assertThat(writerResult.tableIdentifier()).isEqualTo(TABLE_IDENTIFIER);
+ }
+
+ @Test
+ public void testDynamicNoRoute() {
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.dynamicTablesEnabled()).thenReturn(true);
+ when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);
+
+ Map<String, Object> value = ImmutableMap.of(ROUTE_FIELD, "db.foobar");
+
+ List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
+ assertThat(writerResults.size()).isEqualTo(0);
+ }
+
+ private List<IcebergWriterResult> sinkWriterTest(
+ Map<String, Object> value, IcebergSinkConfig config) {
+ IcebergWriterResult writeResult =
+ new IcebergWriterResult(
+ TableIdentifier.parse(TABLE_NAME),
+ ImmutableList.of(mock(DataFile.class)),
+ ImmutableList.of(),
+ Types.StructType.of());
+ IcebergWriter writer = mock(IcebergWriter.class);
+ when(writer.complete()).thenReturn(ImmutableList.of(writeResult));
+
+ IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class);
+ when(writerFactory.createWriter(any(), any(),
anyBoolean())).thenReturn(writer);
+
+ SinkWriter sinkWriter = new SinkWriter(catalog, config);
+
+ // save a record
+ Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+ SinkRecord rec =
+ new SinkRecord(
+ "topic",
+ 1,
+ null,
+ "key",
+ null,
+ value,
+ 100L,
+ now.toEpochMilli(),
+ TimestampType.LOG_APPEND_TIME);
+ sinkWriter.save(ImmutableList.of(rec));
+
+ SinkWriterResult result = sinkWriter.completeWrite();
+
+ Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
+ assertThat(offset).isNotNull();
+ assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than
current offset
+ assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
+
+ return result.writerResults();
+ }
+}