http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java new file mode 100644 index 0000000..65bd9d0 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConvertingFutureCallback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * <p> + * Implementation of OffsetBackingStore that uses a Kafka topic to store offset data. + * </p> + * <p> + * Internally, this implementation both produces to and consumes from a Kafka topic which stores the offsets. + * It accepts producer and consumer overrides via its configuration but forces some settings to specific values + * to ensure correct behavior (e.g. acks, auto.offset.reset). + * </p> + */ +public class KafkaOffsetBackingStore implements OffsetBackingStore { + private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class); + + public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic"; + + private KafkaBasedLog<byte[], byte[]> offsetLog; + private HashMap<ByteBuffer, ByteBuffer> data; + + @Override + public void configure(Map<String, ?> configs) { + String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG); + if (topic == null) + throw new ConnectException("Offset storage topic must be specified"); + + data = new HashMap<>(); + + Map<String, Object> producerProps = new HashMap<>(); + producerProps.putAll(configs); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.putAll(configs); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback); + } + + @Override + public void start() { + log.info("Starting KafkaOffsetBackingStore"); + offsetLog.start(); + log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore"); + } + + @Override + public void stop() { + log.info("Stopping KafkaOffsetBackingStore"); + offsetLog.stop(); + log.info("Stopped KafkaOffsetBackingStore"); + } + + @Override + public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys, + final Callback<Map<ByteBuffer, ByteBuffer>> callback) { + ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) { + @Override + public Map<ByteBuffer, ByteBuffer> convert(Void result) { + Map<ByteBuffer, ByteBuffer> values = new HashMap<>(); + for (ByteBuffer key : keys) + values.put(key, data.get(key)); + return values; + } + }; + // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even + // if we've already read up to the end. However, it also should not be common (offsets should only be read when + // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it + // is safe not to (which should only be if we *know* we've maintained ownership since the last write). + offsetLog.readToEnd(future); + return future; + } + + @Override + public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) { + SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback); + + for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) + offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback); + + return producerCallback; + } + + private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) { + ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null; + ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null; + data.put(key, value); + } + }; + + private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, + Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) { + return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); + } + + private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> { + private int numLeft; + private boolean completed = false; + private Throwable exception = null; + private final Callback<Void> callback; + + public SetCallbackFuture(int numRecords, Callback<Void> callback) { + numLeft = numRecords; + this.callback = callback; + } + + @Override + public synchronized void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + if (!completed) { + this.exception = exception; + callback.onCompletion(exception, null); + completed = true; + this.notify(); + } + return; + } + + numLeft -= 1; + if (numLeft == 0) { + callback.onCompletion(null, null); + completed = true; + this.notify(); + } + } + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public synchronized boolean isCancelled() { + return false; + } + + @Override + public synchronized boolean isDone() { + return completed; + } + + @Override + public synchronized Void get() throws InterruptedException, ExecutionException { + while (!completed) { + this.wait(); + } + if (exception != null) + throw new ExecutionException(exception); + return null; + } + + @Override + public synchronized Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long started = System.currentTimeMillis(); + long limit = started + unit.toMillis(timeout); + while (!completed) { + long leftMs = limit - System.currentTimeMillis(); + if (leftMs < 0) + throw new TimeoutException("KafkaOffsetBackingStore Future timed out."); + this.wait(leftMs); + } + if (exception != null) + throw new ExecutionException(exception); + return null; + } + } + + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java new file mode 100644 index 0000000..d62e38f --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this + * behaves similarly to a real backing store, operations are executed asynchronously on a + * background thread. + */ +public class MemoryOffsetBackingStore implements OffsetBackingStore { + private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); + + protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>(); + protected ExecutorService executor = Executors.newSingleThreadExecutor(); + + public MemoryOffsetBackingStore() { + + } + + @Override + public void configure(Map<String, ?> props) { + } + + @Override + public synchronized void start() { + } + + @Override + public synchronized void stop() { + // Nothing to do since this doesn't maintain any outstanding connections/data + } + + @Override + public Future<Map<ByteBuffer, ByteBuffer>> get( + final Collection<ByteBuffer> keys, + final Callback<Map<ByteBuffer, ByteBuffer>> callback) { + return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() { + @Override + public Map<ByteBuffer, ByteBuffer> call() throws Exception { + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(); + synchronized (MemoryOffsetBackingStore.this) { + for (ByteBuffer key : keys) { + result.put(key, data.get(key)); + } + } + if (callback != null) + callback.onCompletion(null, result); + return result; + } + }); + + } + + @Override + public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, + final Callback<Void> callback) { + return executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + synchronized (MemoryOffsetBackingStore.this) { + for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { + data.put(entry.getKey(), entry.getValue()); + } + save(); + } + if (callback != null) + callback.onCompletion(null, null); + return null; + } + }); + } + + // Hook to allow subclasses to persist data + protected void save() { + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java new file mode 100644 index 0000000..83fdb53 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.util.Callback; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * <p> + * OffsetBackingStore is an interface for storage backends that store key-value data. The backing + * store doesn't need to handle serialization or deserialization. It only needs to support + * reading/writing bytes. Since it is expected these operations will require network + * operations, only bulk operations are supported. + * </p> + * <p> + * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances + * that are associated with individual tasks, the caller must be sure keys include information about the + * connector so that the shared namespace does not result in conflicting keys. + * </p> + */ +public interface OffsetBackingStore extends Configurable { + + /** + * Start this offset store. + */ + public void start(); + + /** + * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block + * indefinitely. + */ + public void stop(); + + /** + * Get the values for the specified keys + * @param keys list of keys to look up + * @param callback callback to invoke on completion + * @return future for the resulting map from key to value + */ + public Future<Map<ByteBuffer, ByteBuffer>> get( + Collection<ByteBuffer> keys, + Callback<Map<ByteBuffer, ByteBuffer>> callback); + + /** + * Set the specified keys and values. + * @param values map from key to value + * @param callback callback to invoke on completion + * @return void future for the operation + */ + public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, + Callback<Void> callback); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java new file mode 100644 index 0000000..23c1019 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented + * directly, the interface is only separate from this implementation because it needs to be + * included in the public API package. + */ +public class OffsetStorageReaderImpl implements OffsetStorageReader { + private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class); + + private final OffsetBackingStore backingStore; + private final String namespace; + private final Converter keyConverter; + private final Converter valueConverter; + + public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, + Converter keyConverter, Converter valueConverter) { + this.backingStore = backingStore; + this.namespace = namespace; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + } + + @Override + public <T> Map<String, Object> offset(Map<String, T> partition) { + return offsets(Arrays.asList(partition)).get(partition); + } + + @Override + public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) { + // Serialize keys so backing store can work with them + Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size()); + for (Map<String, T> key : partitions) { + try { + // Offsets are treated as schemaless, their format is only validated here (and the returned value below) + OffsetUtils.validateFormat(key); + byte[] keySerialized = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, key)); + ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; + serializedToOriginal.put(keyBuffer, key); + } catch (Throwable t) { + log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with " + + "namespace {}. No value for this data will be returned, which may break the " + + "task or cause it to skip some data.", namespace, t); + } + } + + // Get serialized key -> serialized value from backing store + Map<ByteBuffer, ByteBuffer> raw; + try { + raw = backingStore.get(serializedToOriginal.keySet(), null).get(); + } catch (Exception e) { + log.error("Failed to fetch offsets from namespace {}: ", namespace, e); + throw new ConnectException("Failed to fetch offsets.", e); + } + + // Deserialize all the values and map back to the original keys + Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size()); + for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) { + try { + // Since null could be a valid key, explicitly check whether map contains the key + if (!serializedToOriginal.containsKey(rawEntry.getKey())) { + log.error("Should be able to map {} back to a requested partition-offset key, backing " + + "store may have returned invalid data", rawEntry.getKey()); + continue; + } + Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey()); + SchemaAndValue deserializedSchemaAndValue = valueConverter.toConnectData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null); + Object deserializedValue = deserializedSchemaAndValue.value(); + OffsetUtils.validateFormat(deserializedValue); + + result.put(origKey, (Map<String, Object>) deserializedValue); + } catch (Throwable t) { + log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with" + + " namespace {}. No value for this data will be returned, which may break the " + + "task or cause it to skip some data. This could either be due to an error in " + + "the connector implementation or incompatible schema.", namespace, t); + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java new file mode 100644 index 0000000..58376e5 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Future; + +/** + * <p> + * OffsetStorageWriter is a buffered writer that wraps the simple OffsetBackingStore interface. + * It maintains a copy of the key-value data in memory and buffers writes. It allows you to take + * a snapshot, which can then be asynchronously flushed to the backing store while new writes + * continue to be processed. This allows Kafka Connect to process offset commits in the background + * while continuing to process messages. + * </p> + * <p> + * Connect uses an OffsetStorage implementation to save state about the current progress of + * source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as + * simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs + * because they can use Kafka's native offset storage (or the sink data store can handle offset + * storage to achieve exactly once semantics). + * </p> + * <p> + * Both partitions and offsets are generic data objects. This allows different connectors to use + * whatever representation they need, even arbitrarily complex records. These are translated + * internally into the serialized form the OffsetBackingStore uses. + * </p> + * <p> + * Note that this only provides write functionality. This is intentional to ensure stale data is + * never read. Offset data should only be read during startup or reconfiguration of a task. By + * always serving those requests by reading the values from the backing store, we ensure we never + * accidentally use stale data. (One example of how this can occur: a task is processing input + * partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere; + * reconfiguration causes partition A to be reassigned to this node, but now the offset data is out + * of date). Since these offsets are created and managed by the connector itself, there's no way + * for the offset management layer to know which keys are "owned" by which tasks at any given + * time. + * </p> + * <p> + * This class is not thread-safe. It should only be accessed from a Task's processing thread. + * </p> + */ +public class OffsetStorageWriter { + private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); + + private final OffsetBackingStore backingStore; + private final Converter keyConverter; + private final Converter valueConverter; + private final String namespace; + // Offset data in Connect format + private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>(); + + // Not synchronized, should only be accessed by flush thread + private Map<Map<String, Object>, Map<String, Object>> toFlush = null; + // Unique ID for each flush request to handle callbacks after timeouts + private long currentFlushId = 0; + + public OffsetStorageWriter(OffsetBackingStore backingStore, + String namespace, Converter keyConverter, Converter valueConverter) { + this.backingStore = backingStore; + this.namespace = namespace; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + } + + /** + * Set an offset for a partition using Connect data values + * @param partition the partition to store an offset for + * @param offset the offset + */ + public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) { + data.put((Map<String, Object>) partition, (Map<String, Object>) offset); + } + + private boolean flushing() { + return toFlush != null; + } + + /** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. + * + * @return true if a flush was initiated, false if no data was available + */ + public synchronized boolean beginFlush() { + if (flushing()) { + log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " + + "framework should not allow this"); + throw new ConnectException("OffsetStorageWriter is already flushing"); + } + + if (data.isEmpty()) + return false; + + assert !flushing(); + toFlush = data; + data = new HashMap<>(); + return true; + } + + /** + * Flush the current offsets and clear them from this writer. This is non-blocking: it + * moves the current set of offsets out of the way, serializes the data, and asynchronously + * writes the data to the backing store. If no offsets need to be written, the callback is + * still invoked, but no Future is returned. + * + * @return a Future, or null if there are no offsets to commitOffsets + */ + public Future<Void> doFlush(final Callback<Void> callback) { + final long flushId = currentFlushId; + + // Serialize + Map<ByteBuffer, ByteBuffer> offsetsSerialized; + try { + offsetsSerialized = new HashMap<>(); + for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) { + // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate + // for that data. The only enforcement of the format is here. + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + // When serializing the key, we add in the namespace information so the key is [namespace, real key] + byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey())); + ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; + byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue()); + ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; + offsetsSerialized.put(keyBuffer, valueBuffer); + } + } catch (Throwable t) { + // Must handle errors properly here or the writer will be left mid-flush forever and be + // unable to make progress. + log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " + + "offsets under namespace {}. This likely won't recover unless the " + + "unserializable partition or offset information is overwritten.", namespace); + log.error("Cause of serialization failure:", t); + callback.onCompletion(t, null); + return null; + } + + // And submit the data + log.debug("Submitting {} entries to backing store", offsetsSerialized.size()); + return backingStore.set(offsetsSerialized, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + boolean isCurrent = handleFinishWrite(flushId, error, result); + if (isCurrent && callback != null) + callback.onCompletion(error, result); + } + }); + } + + /** + * Cancel a flush that has been initiated by {@link #beginFlush}. This should not be called if + * {@link #doFlush} has already been invoked. It should be used if an operation performed + * between beginFlush and doFlush failed. + */ + public synchronized void cancelFlush() { + // Verify we're still flushing data to handle a race between cancelFlush() calls from up the + // call stack and callbacks from the write request to underlying storage + if (flushing()) { + // Just recombine the data and place it back in the primary storage + toFlush.putAll(data); + data = toFlush; + currentFlushId++; + toFlush = null; + } + } + + /** + * Handle completion of a write. Returns true if this callback is for the current flush + * operation, false if it's for an old operation that should now be ignored. + */ + private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) { + // Callbacks need to be handled carefully since the flush operation may have already timed + // out and been cancelled. + if (flushId != currentFlushId) + return false; + + if (error != null) { + cancelFlush(); + } else { + currentFlushId++; + toFlush = null; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java new file mode 100644 index 0000000..f31715a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.DataException; + +import java.util.Map; + +public class OffsetUtils { + public static void validateFormat(Object offsetData) { + if (offsetData == null) + return; + + if (!(offsetData instanceof Map)) + throw new DataException("Offsets must be specified as a Map"); + validateFormat((Map<Object, Object>) offsetData); + } + + public static <K, V> void validateFormat(Map<K, V> offsetData) { + // Both keys and values for offsets may be null. For values, this is a useful way to delete offsets or indicate + // that there's not usable concept of offsets in your source system. + if (offsetData == null) + return; + + for (Map.Entry<K, V> entry : offsetData.entrySet()) { + if (!(entry.getKey() instanceof String)) + throw new DataException("Offsets may only use String keys"); + + Object value = entry.getValue(); + if (value == null) + continue; + Schema.Type schemaType = ConnectSchema.schemaType(value.getClass()); + if (!schemaType.isPrimitive()) + throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java new file mode 100644 index 0000000..0db2ce2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +/** + * Generic interface for callbacks + */ +public interface Callback<V> { + /** + * Invoked upon completion of the operation. + * + * @param error the error that caused the operation to fail, or null if no error occurred + * @param result the return value, or null if the operation failed + */ + void onCompletion(Throwable error, V result); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java new file mode 100644 index 0000000..8176d82 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; + +/** + * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within + * the connector. + */ +public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> { + private final String connector; + private final int task; + + @JsonCreator + public ConnectorTaskId(@JsonProperty("connector") String connector, @JsonProperty("task") int task) { + this.connector = connector; + this.task = task; + } + + @JsonProperty + public String connector() { + return connector; + } + + @JsonProperty + public int task() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConnectorTaskId that = (ConnectorTaskId) o; + + if (task != that.task) + return false; + if (connector != null ? !connector.equals(that.connector) : that.connector != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = connector != null ? connector.hashCode() : 0; + result = 31 * result + task; + return result; + } + + @Override + public String toString() { + return connector + '-' + task; + } + + @Override + public int compareTo(ConnectorTaskId o) { + int connectorCmp = connector.compareTo(o.connector); + if (connectorCmp != 0) + return connectorCmp; + return ((Integer) task).compareTo(o.task); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java new file mode 100644 index 0000000..88bc9a1 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> { + + private Callback<T> underlying; + private CountDownLatch finishedLatch; + private T result = null; + private Throwable exception = null; + + public ConvertingFutureCallback(Callback<T> underlying) { + this.underlying = underlying; + this.finishedLatch = new CountDownLatch(1); + } + + public abstract T convert(U result); + + @Override + public void onCompletion(Throwable error, U result) { + this.exception = error; + this.result = convert(result); + if (underlying != null) + underlying.onCompletion(error, this.result); + finishedLatch.countDown(); + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return finishedLatch.getCount() == 0; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + finishedLatch.await(); + return result(); + } + + @Override + public T get(long l, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + if (!finishedLatch.await(l, timeUnit)) + throw new TimeoutException("Timed out waiting for future"); + return result(); + } + + private T result() throws ExecutionException { + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java new file mode 100644 index 0000000..5b0522a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +public class FutureCallback<T> extends ConvertingFutureCallback<T, T> { + + public FutureCallback(Callback<T> underlying) { + super(underlying); + } + + public FutureCallback() { + super(null); + } + + @Override + public T convert(T result) { + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java new file mode 100644 index 0000000..3b37076 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Future; + +/** + * <p> + * KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all + * clients need to consume and, at times, agree on their offset / that they have read to the end of the log. + * </p> + * <p> + * This functionality is useful for storing different types of data that all clients may need to agree on -- + * offsets or config for example. This class runs a consumer in a background thread to continuously tail the target + * topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful + * utilities like checking the current log end offset and waiting until the current end of the log is reached. + * </p> + * <p> + * To support different use cases, this class works with either single- or multi-partition topics. + * </p> + * <p> + * Since this class is generic, it delegates the details of data storage via a callback that is invoked for each + * record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the + * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked + * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required. + * </p> + */ +public class KafkaBasedLog<K, V> { + private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); + private static final long CREATE_TOPIC_TIMEOUT_MS = 30000; + + private Time time; + private final String topic; + private final Map<String, Object> producerConfigs; + private final Map<String, Object> consumerConfigs; + private final Callback<ConsumerRecord<K, V>> consumedCallback; + private Consumer<K, V> consumer; + private Producer<K, V> producer; + + private Thread thread; + private boolean stopRequested; + private Queue<Callback<Void>> readLogEndOffsetCallbacks; + + /** + * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until + * {@link #start()} is invoked. + * + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the number of acks, will be overridden to ensure correct behavior of this + * class. + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * contain compatible serializer settings for the generic types used on this class. Some + * setting, such as the auto offset reset policy, will be overridden to ensure correct + * behavior of this class. + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + */ + public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, + Callback<ConsumerRecord<K, V>> consumedCallback, Time time) { + this.topic = topic; + this.producerConfigs = producerConfigs; + this.consumerConfigs = consumerConfigs; + this.consumedCallback = consumedCallback; + this.stopRequested = false; + this.readLogEndOffsetCallbacks = new ArrayDeque<>(); + this.time = time; + } + + public void start() { + log.info("Starting KafkaBasedLog with topic " + topic); + + producer = createProducer(); + consumer = createConsumer(); + + List<TopicPartition> partitions = new ArrayList<>(); + + // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing, + // we rely on topic auto-creation + List<PartitionInfo> partitionInfos = null; + long started = time.milliseconds(); + while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { + partitionInfos = consumer.partitionsFor(topic); + Utils.sleep(Math.min(time.milliseconds() - started, 1000)); + } + if (partitionInfos == null) + throw new ConnectException("Could not look up partition metadata for offset backing store topic in" + + " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" + + " this is your first use of the topic it may have taken too long to create."); + + for (PartitionInfo partition : partitionInfos) + partitions.add(new TopicPartition(partition.topic(), partition.partition())); + consumer.assign(partitions); + + readToLogEnd(); + + thread = new WorkThread(); + thread.start(); + + log.info("Finished reading KafakBasedLog for topic " + topic); + + log.info("Started KafakBasedLog for topic " + topic); + } + + public void stop() { + log.info("Stopping KafkaBasedLog for topic " + topic); + + synchronized (this) { + stopRequested = true; + } + consumer.wakeup(); + + try { + thread.join(); + } catch (InterruptedException e) { + throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + + "down it's producer and consumer.", e); + } + + try { + producer.close(); + } catch (KafkaException e) { + log.error("Failed to stop KafkaBasedLog producer", e); + } + + try { + consumer.close(); + } catch (KafkaException e) { + log.error("Failed to stop KafkaBasedLog consumer", e); + } + + log.info("Stopped KafkaBasedLog for topic " + topic); + } + + /** + * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback. + * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether + * additional records have been written to the log. If the caller needs to ensure they have truly reached the end + * of the log, they must ensure there are no other writers during this period. + * + * This waits until the end of all partitions has been reached. + * + * This method is asynchronous. If you need a synchronous version, pass an instance of + * {@link org.apache.kafka.connect.util.FutureCallback} as the {@param callback} parameter and wait on it to block. + * + * @param callback the callback to invoke once the end of the log has been reached. + */ + public void readToEnd(Callback<Void> callback) { + producer.flush(); + synchronized (this) { + readLogEndOffsetCallbacks.add(callback); + } + consumer.wakeup(); + } + + /** + * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback. + * @return the future associated with the operation + */ + public Future<Void> readToEnd() { + FutureCallback<Void> future = new FutureCallback<>(null); + readToEnd(future); + return future; + } + + public void send(K key, V value) { + send(key, value, null); + } + + public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) { + producer.send(new ProducerRecord<>(topic, key, value), callback); + } + + + private Producer<K, V> createProducer() { + // Always require producer acks to all to ensure durable writes + producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); + return new KafkaProducer<>(producerConfigs); + } + + private Consumer<K, V> createConsumer() { + // Always force reset to the beginning of the log since this class wants to consume all available log data + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(consumerConfigs); + } + + private void poll(long timeoutMs) { + try { + ConsumerRecords<K, V> records = consumer.poll(timeoutMs); + for (ConsumerRecord<K, V> record : records) + consumedCallback.onCompletion(null, record); + } catch (WakeupException e) { + // Expected on get() or stop(). The calling code should handle this + throw e; + } catch (KafkaException e) { + log.error("Error polling: " + e); + } + } + + private void readToLogEnd() { + log.trace("Reading to end of offset log"); + + Set<TopicPartition> assignment = consumer.assignment(); + + // This approach to getting the current end offset is hacky until we have an API for looking these up directly + Map<TopicPartition, Long> offsets = new HashMap<>(); + for (TopicPartition tp : assignment) { + long offset = consumer.position(tp); + offsets.put(tp, offset); + consumer.seekToEnd(tp); + } + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + try { + poll(0); + } finally { + // If there is an exception, even a possibly expected one like WakeupException, we need to make sure + // the consumers position is reset or it'll get into an inconsistent state. + for (TopicPartition tp : assignment) { + long startOffset = offsets.get(tp); + long endOffset = consumer.position(tp); + if (endOffset > startOffset) { + endOffsets.put(tp, endOffset); + consumer.seek(tp, startOffset); + } + log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset); + } + } + + while (!endOffsets.isEmpty()) { + poll(Integer.MAX_VALUE); + + Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<TopicPartition, Long> entry = it.next(); + if (consumer.position(entry.getKey()) >= entry.getValue()) + it.remove(); + else + break; + } + } + } + + + private class WorkThread extends Thread { + @Override + public void run() { + try { + while (true) { + int numCallbacks; + synchronized (KafkaBasedLog.this) { + if (stopRequested) + break; + numCallbacks = readLogEndOffsetCallbacks.size(); + } + + if (numCallbacks > 0) { + try { + readToLogEnd(); + } catch (WakeupException e) { + // Either received another get() call and need to retry reading to end of log or stop() was + // called. Both are handled by restarting this loop. + continue; + } + } + + synchronized (KafkaBasedLog.this) { + // Only invoke exactly the number of callbacks we found before triggering the read to log end + // since it is possible for another write + readToEnd to sneak in in the meantime + for (int i = 0; i < numCallbacks; i++) { + Callback<Void> cb = readLogEndOffsetCallbacks.poll(); + cb.onCompletion(null, null); + } + } + + try { + poll(Integer.MAX_VALUE); + } catch (WakeupException e) { + // See previous comment, both possible causes of this wakeup are handled by starting this loop again + continue; + } + } + } catch (Throwable t) { + log.error("Unexpected exception in KafkaBasedLog's work thread", t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java new file mode 100644 index 0000000..01dac90 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p> + * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown, + * a flag is set, which the thread should detect and try to exit gracefully from. In forcible + * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit + * gracefully, but then force it to exit if it takes too long. + * </p> + * <p> + * Implementations should override the {@link #execute} method and check {@link #getRunning} to + * determine whether they should try to gracefully exit. + * </p> + */ +public abstract class ShutdownableThread extends Thread { + private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class); + + private AtomicBoolean isRunning = new AtomicBoolean(true); + private CountDownLatch shutdownLatch = new CountDownLatch(1); + + /** + * An UncaughtExceptionHandler to register on every instance of this class. This is useful for + * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one + * instance is used for all threads, it must be thread-safe. + */ + volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null; + + public ShutdownableThread(String name) { + // The default is daemon=true so that these threads will not prevent shutdown. We use this + // default because threads that are running user code that may not clean up properly, even + // when we attempt to forcibly shut them down. + this(name, true); + } + + public ShutdownableThread(String name, boolean daemon) { + super(name); + this.setDaemon(daemon); + if (funcaughtExceptionHandler != null) + this.setUncaughtExceptionHandler(funcaughtExceptionHandler); + } + + /** + * Implementations should override this method with the main body for the thread. + */ + public abstract void execute(); + + /** + * Returns true if the thread hasn't exited yet and none of the shutdown methods have been + * invoked + */ + public boolean getRunning() { + return isRunning.get(); + } + + @Override + public void run() { + try { + execute(); + } catch (Error | RuntimeException e) { + log.error("Thread {} exiting with uncaught exception: ", getName(), e); + throw e; + } finally { + shutdownLatch.countDown(); + } + } + + /** + * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then + * forcibly interrupting the thread. + * @param gracefulTimeout the maximum time to wait for a graceful exit + * @param unit the time unit of the timeout argument + */ + public void shutdown(long gracefulTimeout, TimeUnit unit) + throws InterruptedException { + boolean success = gracefulShutdown(gracefulTimeout, unit); + if (!success) + forceShutdown(); + } + + /** + * Attempt graceful shutdown + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if successful, false if the timeout elapsed + */ + public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException { + startGracefulShutdown(); + return awaitShutdown(timeout, unit); + } + + /** + * Start shutting down this thread gracefully, but do not block waiting for it to exit. + */ + public void startGracefulShutdown() { + log.info("Starting graceful shutdown of thread {}", getName()); + isRunning.set(false); + } + + /** + * Awaits shutdown of this thread, waiting up to the timeout. + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if successful, false if the timeout elapsed + * @throws InterruptedException + */ + public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException { + return shutdownLatch.await(timeout, unit); + } + + /** + * Immediately tries to force the thread to shut down by interrupting it. This does not try to + * wait for the thread to truly exit because forcible shutdown is not always possible. By + * default, threads are marked as daemon threads so they will not prevent the process from + * exiting. + */ + public void forceShutdown() throws InterruptedException { + log.info("Forcing shutdown of thread {}", getName()); + isRunning.set(false); + interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java new file mode 100644 index 0000000..62ec5a8 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.MockTime; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(WorkerSinkTask.class) +@PowerMockIgnore("javax.management.*") +public class WorkerSinkTaskTest { + // These are fixed to keep this code simpler. In this example we assume byte[] raw values + // with mix of integer/string in Connect + private static final String TOPIC = "test"; + private static final int PARTITION = 12; + private static final int PARTITION2 = 13; + private static final long FIRST_OFFSET = 45; + private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; + private static final int KEY = 12; + private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; + private static final String VALUE = "VALUE"; + private static final byte[] RAW_KEY = "key".getBytes(); + private static final byte[] RAW_VALUE = "value".getBytes(); + + private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); + private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); + + private static final Map<String, String> TASK_PROPS = new HashMap<>(); + static { + TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); + } + + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private Time time; + private WorkerSinkTask workerTask; + @Mock + private SinkTask sinkTask; + private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); + private WorkerConfig workerConfig; + @Mock + private Converter keyConverter; + @Mock + private Converter valueConverter; + @Mock + private WorkerSinkTaskThread workerThread; + @Mock + private KafkaConsumer<byte[], byte[]> consumer; + private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); + + private long recordsReturned; + + @Before + public void setUp() { + time = new MockTime(); + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerConfig = new StandaloneConfig(workerProps); + workerTask = PowerMock.createPartialMock( + WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); + + recordsReturned = 0; + } + + @Test + public void testPollRedelivery() throws Exception { + expectInitializeTask(); + + // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime + expectConsumerPoll(1); + expectConvertMessages(1); + Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL); + sinkTask.put(EasyMock.capture(records)); + EasyMock.expectLastCall().andThrow(new RetriableException("retry")); + // Pause + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2))); + consumer.pause(TOPIC_PARTITION); + PowerMock.expectLastCall(); + consumer.pause(TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + // Retry delivery should suceed + expectConsumerPoll(0); + sinkTask.put(EasyMock.capture(records)); + EasyMock.expectLastCall(); + // And unpause + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2))); + consumer.resume(TOPIC_PARTITION); + PowerMock.expectLastCall(); + consumer.resume(TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + workerTask.poll(Long.MAX_VALUE); + workerTask.poll(Long.MAX_VALUE); + + PowerMock.verifyAll(); + } + + + private void expectInitializeTask() throws Exception { + PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); + PowerMock.expectPrivate(workerTask, "createWorkerThread") + .andReturn(workerThread); + workerThread.start(); + PowerMock.expectLastCall(); + + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + PowerMock.expectLastCall(); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); + return ConsumerRecords.empty(); + } + }); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); + PowerMock.expectLastCall(); + sinkTask.start(TASK_PROPS); + PowerMock.expectLastCall(); + } + + private void expectConsumerPoll(final int numMessages) { + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) + records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE)); + recordsReturned += numMessages; + return new ConsumerRecords<>( + numMessages > 0 ? + Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) : + Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap() + ); + } + }); + } + + private void expectConvertMessages(final int numMessages) { + EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages); + EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages); + } +}