http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java deleted file mode 100644 index b270368..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConvertingFutureCallback; -import org.apache.kafka.copycat.util.KafkaBasedLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * <p> - * Implementation of OffsetBackingStore that uses a Kafka topic to store offset data. - * </p> - * <p> - * Internally, this implementation both produces to and consumes from a Kafka topic which stores the offsets. - * It accepts producer and consumer overrides via its configuration but forces some settings to specific values - * to ensure correct behavior (e.g. acks, auto.offset.reset). - * </p> - */ -public class KafkaOffsetBackingStore implements OffsetBackingStore { - private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class); - - public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic"; - - private KafkaBasedLog<byte[], byte[]> offsetLog; - private HashMap<ByteBuffer, ByteBuffer> data; - - @Override - public void configure(Map<String, ?> configs) { - String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG); - if (topic == null) - throw new CopycatException("Offset storage topic must be specified"); - - data = new HashMap<>(); - - Map<String, Object> producerProps = new HashMap<>(); - producerProps.putAll(configs); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - - Map<String, Object> consumerProps = new HashMap<>(); - consumerProps.putAll(configs); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - - offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback); - } - - @Override - public void start() { - log.info("Starting KafkaOffsetBackingStore"); - offsetLog.start(); - log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore"); - } - - @Override - public void stop() { - log.info("Stopping KafkaOffsetBackingStore"); - offsetLog.stop(); - log.info("Stopped KafkaOffsetBackingStore"); - } - - @Override - public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys, - final Callback<Map<ByteBuffer, ByteBuffer>> callback) { - ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) { - @Override - public Map<ByteBuffer, ByteBuffer> convert(Void result) { - Map<ByteBuffer, ByteBuffer> values = new HashMap<>(); - for (ByteBuffer key : keys) - values.put(key, data.get(key)); - return values; - } - }; - // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even - // if we've already read up to the end. However, it also should not be common (offsets should only be read when - // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it - // is safe not to (which should only be if we *know* we've maintained ownership since the last write). - offsetLog.readToEnd(future); - return future; - } - - @Override - public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) { - SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback); - - for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) - offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback); - - return producerCallback; - } - - private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { - @Override - public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) { - ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null; - ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null; - data.put(key, value); - } - }; - - private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, - Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) { - return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); - } - - private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> { - private int numLeft; - private boolean completed = false; - private Throwable exception = null; - private final Callback<Void> callback; - - public SetCallbackFuture(int numRecords, Callback<Void> callback) { - numLeft = numRecords; - this.callback = callback; - } - - @Override - public synchronized void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - if (!completed) { - this.exception = exception; - callback.onCompletion(exception, null); - completed = true; - this.notify(); - } - return; - } - - numLeft -= 1; - if (numLeft == 0) { - callback.onCompletion(null, null); - completed = true; - this.notify(); - } - } - - @Override - public synchronized boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public synchronized boolean isCancelled() { - return false; - } - - @Override - public synchronized boolean isDone() { - return completed; - } - - @Override - public synchronized Void get() throws InterruptedException, ExecutionException { - while (!completed) { - this.wait(); - } - if (exception != null) - throw new ExecutionException(exception); - return null; - } - - @Override - public synchronized Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - long started = System.currentTimeMillis(); - long limit = started + unit.toMillis(timeout); - while (!completed) { - long leftMs = limit - System.currentTimeMillis(); - if (leftMs < 0) - throw new TimeoutException("KafkaOffsetBackingStore Future timed out."); - this.wait(leftMs); - } - if (exception != null) - throw new ExecutionException(exception); - return null; - } - } - - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java deleted file mode 100644 index 11a1b89..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this - * behaves similarly to a real backing store, operations are executed asynchronously on a - * background thread. - */ -public class MemoryOffsetBackingStore implements OffsetBackingStore { - private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); - - protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>(); - protected ExecutorService executor = Executors.newSingleThreadExecutor(); - - public MemoryOffsetBackingStore() { - - } - - @Override - public void configure(Map<String, ?> props) { - } - - @Override - public synchronized void start() { - } - - @Override - public synchronized void stop() { - // Nothing to do since this doesn't maintain any outstanding connections/data - } - - @Override - public Future<Map<ByteBuffer, ByteBuffer>> get( - final Collection<ByteBuffer> keys, - final Callback<Map<ByteBuffer, ByteBuffer>> callback) { - return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() { - @Override - public Map<ByteBuffer, ByteBuffer> call() throws Exception { - Map<ByteBuffer, ByteBuffer> result = new HashMap<>(); - synchronized (MemoryOffsetBackingStore.this) { - for (ByteBuffer key : keys) { - result.put(key, data.get(key)); - } - } - if (callback != null) - callback.onCompletion(null, result); - return result; - } - }); - - } - - @Override - public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, - final Callback<Void> callback) { - return executor.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - synchronized (MemoryOffsetBackingStore.this) { - for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { - data.put(entry.getKey(), entry.getValue()); - } - save(); - } - if (callback != null) - callback.onCompletion(null, null); - return null; - } - }); - } - - // Hook to allow subclasses to persist data - protected void save() { - - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java deleted file mode 100644 index 239d9a8..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.common.Configurable; -import org.apache.kafka.copycat.util.Callback; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Future; - -/** - * <p> - * OffsetBackingStore is an interface for storage backends that store key-value data. The backing - * store doesn't need to handle serialization or deserialization. It only needs to support - * reading/writing bytes. Since it is expected these operations will require network - * operations, only bulk operations are supported. - * </p> - * <p> - * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances - * that are associated with individual tasks, the caller must be sure keys include information about the - * connector so that the shared namespace does not result in conflicting keys. - * </p> - */ -public interface OffsetBackingStore extends Configurable { - - /** - * Start this offset store. - */ - public void start(); - - /** - * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block - * indefinitely. - */ - public void stop(); - - /** - * Get the values for the specified keys - * @param keys list of keys to look up - * @param callback callback to invoke on completion - * @return future for the resulting map from key to value - */ - public Future<Map<ByteBuffer, ByteBuffer>> get( - Collection<ByteBuffer> keys, - Callback<Map<ByteBuffer, ByteBuffer>> callback); - - /** - * Set the specified keys and values. - * @param values map from key to value - * @param callback callback to invoke on completion - * @return void future for the operation - */ - public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, - Callback<Void> callback); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java deleted file mode 100644 index 84229a5..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.errors.CopycatException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -/** - * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented - * directly, the interface is only separate from this implementation because it needs to be - * included in the public API package. - */ -public class OffsetStorageReaderImpl implements OffsetStorageReader { - private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class); - - private final OffsetBackingStore backingStore; - private final String namespace; - private final Converter keyConverter; - private final Converter valueConverter; - - public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, - Converter keyConverter, Converter valueConverter) { - this.backingStore = backingStore; - this.namespace = namespace; - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; - } - - @Override - public <T> Map<String, Object> offset(Map<String, T> partition) { - return offsets(Arrays.asList(partition)).get(partition); - } - - @Override - public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) { - // Serialize keys so backing store can work with them - Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size()); - for (Map<String, T> key : partitions) { - try { - // Offsets are treated as schemaless, their format is only validated here (and the returned value below) - OffsetUtils.validateFormat(key); - byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, key)); - ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; - serializedToOriginal.put(keyBuffer, key); - } catch (Throwable t) { - log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with " - + "namespace {}. No value for this data will be returned, which may break the " - + "task or cause it to skip some data.", namespace, t); - } - } - - // Get serialized key -> serialized value from backing store - Map<ByteBuffer, ByteBuffer> raw; - try { - raw = backingStore.get(serializedToOriginal.keySet(), null).get(); - } catch (Exception e) { - log.error("Failed to fetch offsets from namespace {}: ", namespace, e); - throw new CopycatException("Failed to fetch offsets.", e); - } - - // Deserialize all the values and map back to the original keys - Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size()); - for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) { - try { - // Since null could be a valid key, explicitly check whether map contains the key - if (!serializedToOriginal.containsKey(rawEntry.getKey())) { - log.error("Should be able to map {} back to a requested partition-offset key, backing " - + "store may have returned invalid data", rawEntry.getKey()); - continue; - } - Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey()); - SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null); - Object deserializedValue = deserializedSchemaAndValue.value(); - OffsetUtils.validateFormat(deserializedValue); - - result.put(origKey, (Map<String, Object>) deserializedValue); - } catch (Throwable t) { - log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with" - + " namespace {}. No value for this data will be returned, which may break the " - + "task or cause it to skip some data. This could either be due to an error in " - + "the connector implementation or incompatible schema.", namespace, t); - } - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java deleted file mode 100644 index 59c12a7..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.Future; - -/** - * <p> - * OffsetStorageWriter is a buffered writer that wraps the simple OffsetBackingStore interface. - * It maintains a copy of the key-value data in memory and buffers writes. It allows you to take - * a snapshot, which can then be asynchronously flushed to the backing store while new writes - * continue to be processed. This allows Copycat to process offset commits in the background - * while continuing to process messages. - * </p> - * <p> - * Copycat uses an OffsetStorage implementation to save state about the current progress of - * source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as - * simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs - * because they can use Kafka's native offset storage (or the sink data store can handle offset - * storage to achieve exactly once semantics). - * </p> - * <p> - * Both partitions and offsets are generic data objects. This allows different connectors to use - * whatever representation they need, even arbitrarily complex records. These are translated - * internally into the serialized form the OffsetBackingStore uses. - * </p> - * <p> - * Note that this only provides write functionality. This is intentional to ensure stale data is - * never read. Offset data should only be read during startup or reconfiguration of a task. By - * always serving those requests by reading the values from the backing store, we ensure we never - * accidentally use stale data. (One example of how this can occur: a task is processing input - * partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere; - * reconfiguration causes partition A to be reassigned to this node, but now the offset data is out - * of date). Since these offsets are created and managed by the connector itself, there's no way - * for the offset management layer to know which keys are "owned" by which tasks at any given - * time. - * </p> - * <p> - * This class is not thread-safe. It should only be accessed from a Task's processing thread. - * </p> - */ -public class OffsetStorageWriter { - private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); - - private final OffsetBackingStore backingStore; - private final Converter keyConverter; - private final Converter valueConverter; - private final String namespace; - // Offset data in Copycat format - private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>(); - - // Not synchronized, should only be accessed by flush thread - private Map<Map<String, Object>, Map<String, Object>> toFlush = null; - // Unique ID for each flush request to handle callbacks after timeouts - private long currentFlushId = 0; - - public OffsetStorageWriter(OffsetBackingStore backingStore, - String namespace, Converter keyConverter, Converter valueConverter) { - this.backingStore = backingStore; - this.namespace = namespace; - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; - } - - /** - * Set an offset for a partition using Copycat data values - * @param partition the partition to store an offset for - * @param offset the offset - */ - public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) { - data.put((Map<String, Object>) partition, (Map<String, Object>) offset); - } - - private boolean flushing() { - return toFlush != null; - } - - /** - * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. - * - * @return true if a flush was initiated, false if no data was available - */ - public synchronized boolean beginFlush() { - if (flushing()) { - log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " - + "framework should not allow this"); - throw new CopycatException("OffsetStorageWriter is already flushing"); - } - - if (data.isEmpty()) - return false; - - assert !flushing(); - toFlush = data; - data = new HashMap<>(); - return true; - } - - /** - * Flush the current offsets and clear them from this writer. This is non-blocking: it - * moves the current set of offsets out of the way, serializes the data, and asynchronously - * writes the data to the backing store. If no offsets need to be written, the callback is - * still invoked, but no Future is returned. - * - * @return a Future, or null if there are no offsets to commitOffsets - */ - public Future<Void> doFlush(final Callback<Void> callback) { - final long flushId = currentFlushId; - - // Serialize - Map<ByteBuffer, ByteBuffer> offsetsSerialized; - try { - offsetsSerialized = new HashMap<>(); - for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) { - // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate - // for that data. The only enforcement of the format is here. - OffsetUtils.validateFormat(entry.getKey()); - OffsetUtils.validateFormat(entry.getValue()); - // When serializing the key, we add in the namespace information so the key is [namespace, real key] - byte[] key = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, entry.getKey())); - ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; - byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue()); - ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; - offsetsSerialized.put(keyBuffer, valueBuffer); - } - } catch (Throwable t) { - // Must handle errors properly here or the writer will be left mid-flush forever and be - // unable to make progress. - log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " - + "offsets under namespace {}. This likely won't recover unless the " - + "unserializable partition or offset information is overwritten.", namespace); - log.error("Cause of serialization failure:", t); - callback.onCompletion(t, null); - return null; - } - - // And submit the data - log.debug("Submitting {} entries to backing store", offsetsSerialized.size()); - return backingStore.set(offsetsSerialized, new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - boolean isCurrent = handleFinishWrite(flushId, error, result); - if (isCurrent && callback != null) - callback.onCompletion(error, result); - } - }); - } - - /** - * Cancel a flush that has been initiated by {@link #beginFlush}. This should not be called if - * {@link #doFlush} has already been invoked. It should be used if an operation performed - * between beginFlush and doFlush failed. - */ - public synchronized void cancelFlush() { - // Verify we're still flushing data to handle a race between cancelFlush() calls from up the - // call stack and callbacks from the write request to underlying storage - if (flushing()) { - // Just recombine the data and place it back in the primary storage - toFlush.putAll(data); - data = toFlush; - currentFlushId++; - toFlush = null; - } - } - - /** - * Handle completion of a write. Returns true if this callback is for the current flush - * operation, false if it's for an old operation that should now be ignored. - */ - private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) { - // Callbacks need to be handled carefully since the flush operation may have already timed - // out and been cancelled. - if (flushId != currentFlushId) - return false; - - if (error != null) { - cancelFlush(); - } else { - currentFlushId++; - toFlush = null; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java deleted file mode 100644 index 9ba7662..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.data.CopycatSchema; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.errors.DataException; - -import java.util.Map; - -public class OffsetUtils { - public static void validateFormat(Object offsetData) { - if (offsetData == null) - return; - - if (!(offsetData instanceof Map)) - throw new DataException("Offsets must be specified as a Map"); - validateFormat((Map<Object, Object>) offsetData); - } - - public static <K, V> void validateFormat(Map<K, V> offsetData) { - // Both keys and values for offsets may be null. For values, this is a useful way to delete offsets or indicate - // that there's not usable concept of offsets in your source system. - if (offsetData == null) - return; - - for (Map.Entry<K, V> entry : offsetData.entrySet()) { - if (!(entry.getKey() instanceof String)) - throw new DataException("Offsets may only use String keys"); - - Object value = entry.getValue(); - if (value == null) - continue; - Schema.Type schemaType = CopycatSchema.schemaType(value.getClass()); - if (!schemaType.isPrimitive()) - throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java deleted file mode 100644 index 5cf1423..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -/** - * Generic interface for callbacks - */ -public interface Callback<V> { - /** - * Invoked upon completion of the operation. - * - * @param error the error that caused the operation to fail, or null if no error occurred - * @param result the return value, or null if the operation failed - */ - void onCompletion(Throwable error, V result); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java deleted file mode 100644 index d4cf824..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.Serializable; - -/** - * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within - * the connector. - */ -public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> { - private final String connector; - private final int task; - - @JsonCreator - public ConnectorTaskId(@JsonProperty("connector") String connector, @JsonProperty("task") int task) { - this.connector = connector; - this.task = task; - } - - @JsonProperty - public String connector() { - return connector; - } - - @JsonProperty - public int task() { - return task; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - ConnectorTaskId that = (ConnectorTaskId) o; - - if (task != that.task) - return false; - if (connector != null ? !connector.equals(that.connector) : that.connector != null) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = connector != null ? connector.hashCode() : 0; - result = 31 * result + task; - return result; - } - - @Override - public String toString() { - return connector + '-' + task; - } - - @Override - public int compareTo(ConnectorTaskId o) { - int connectorCmp = connector.compareTo(o.connector); - if (connectorCmp != 0) - return connectorCmp; - return ((Integer) task).compareTo(o.task); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java deleted file mode 100644 index 862adf9..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> { - - private Callback<T> underlying; - private CountDownLatch finishedLatch; - private T result = null; - private Throwable exception = null; - - public ConvertingFutureCallback(Callback<T> underlying) { - this.underlying = underlying; - this.finishedLatch = new CountDownLatch(1); - } - - public abstract T convert(U result); - - @Override - public void onCompletion(Throwable error, U result) { - this.exception = error; - this.result = convert(result); - if (underlying != null) - underlying.onCompletion(error, this.result); - finishedLatch.countDown(); - } - - @Override - public boolean cancel(boolean b) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return finishedLatch.getCount() == 0; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - finishedLatch.await(); - return result(); - } - - @Override - public T get(long l, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException { - if (!finishedLatch.await(l, timeUnit)) - throw new TimeoutException("Timed out waiting for future"); - return result(); - } - - private T result() throws ExecutionException { - if (exception != null) { - throw new ExecutionException(exception); - } - return result; - } -} - http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java deleted file mode 100644 index 269482c..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -public class FutureCallback<T> extends ConvertingFutureCallback<T, T> { - - public FutureCallback(Callback<T> underlying) { - super(underlying); - } - - public FutureCallback() { - super(null); - } - - @Override - public T convert(T result) { - return result; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java deleted file mode 100644 index f5e72d3..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java +++ /dev/null @@ -1,331 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.errors.CopycatException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Future; - -/** - * <p> - * KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all - * clients need to consume and, at times, agree on their offset / that they have read to the end of the log. - * </p> - * <p> - * This functionality is useful for storing different types of data that all clients may need to agree on -- - * offsets or config for example. This class runs a consumer in a background thread to continuously tail the target - * topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful - * utilities like checking the current log end offset and waiting until the current end of the log is reached. - * </p> - * <p> - * To support different use cases, this class works with either single- or multi-partition topics. - * </p> - * <p> - * Since this class is generic, it delegates the details of data storage via a callback that is invoked for each - * record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the - * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked - * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required. - * </p> - */ -public class KafkaBasedLog<K, V> { - private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); - private static final long CREATE_TOPIC_TIMEOUT_MS = 30000; - - private Time time; - private final String topic; - private final Map<String, Object> producerConfigs; - private final Map<String, Object> consumerConfigs; - private final Callback<ConsumerRecord<K, V>> consumedCallback; - private Consumer<K, V> consumer; - private Producer<K, V> producer; - - private Thread thread; - private boolean stopRequested; - private Queue<Callback<Void>> readLogEndOffsetCallbacks; - - /** - * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until - * {@link #start()} is invoked. - * - * @param topic the topic to treat as a log - * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must - * contain compatible serializer settings for the generic types used on this class. Some - * setting, such as the number of acks, will be overridden to ensure correct behavior of this - * class. - * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must - * contain compatible serializer settings for the generic types used on this class. Some - * setting, such as the auto offset reset policy, will be overridden to ensure correct - * behavior of this class. - * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log - * @param time Time interface - */ - public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, - Callback<ConsumerRecord<K, V>> consumedCallback, Time time) { - this.topic = topic; - this.producerConfigs = producerConfigs; - this.consumerConfigs = consumerConfigs; - this.consumedCallback = consumedCallback; - this.stopRequested = false; - this.readLogEndOffsetCallbacks = new ArrayDeque<>(); - this.time = time; - } - - public void start() { - log.info("Starting KafkaBasedLog with topic " + topic); - - producer = createProducer(); - consumer = createConsumer(); - - List<TopicPartition> partitions = new ArrayList<>(); - - // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing, - // we rely on topic auto-creation - List<PartitionInfo> partitionInfos = null; - long started = time.milliseconds(); - while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { - partitionInfos = consumer.partitionsFor(topic); - Utils.sleep(Math.min(time.milliseconds() - started, 1000)); - } - if (partitionInfos == null) - throw new CopycatException("Could not look up partition metadata for offset backing store topic in" + - " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" + - " this is your first use of the topic it may have taken too long to create."); - - for (PartitionInfo partition : partitionInfos) - partitions.add(new TopicPartition(partition.topic(), partition.partition())); - consumer.assign(partitions); - - readToLogEnd(); - - thread = new WorkThread(); - thread.start(); - - log.info("Finished reading KafakBasedLog for topic " + topic); - - log.info("Started KafakBasedLog for topic " + topic); - } - - public void stop() { - log.info("Stopping KafkaBasedLog for topic " + topic); - - synchronized (this) { - stopRequested = true; - } - consumer.wakeup(); - - try { - thread.join(); - } catch (InterruptedException e) { - throw new CopycatException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + - "down it's producer and consumer.", e); - } - - try { - producer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog producer", e); - } - - try { - consumer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog consumer", e); - } - - log.info("Stopped KafkaBasedLog for topic " + topic); - } - - /** - * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback. - * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether - * additional records have been written to the log. If the caller needs to ensure they have truly reached the end - * of the log, they must ensure there are no other writers during this period. - * - * This waits until the end of all partitions has been reached. - * - * This method is asynchronous. If you need a synchronous version, pass an instance of - * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param callback} parameter and wait on it to block. - * - * @param callback the callback to invoke once the end of the log has been reached. - */ - public void readToEnd(Callback<Void> callback) { - producer.flush(); - synchronized (this) { - readLogEndOffsetCallbacks.add(callback); - } - consumer.wakeup(); - } - - /** - * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback. - * @return the future associated with the operation - */ - public Future<Void> readToEnd() { - FutureCallback<Void> future = new FutureCallback<>(null); - readToEnd(future); - return future; - } - - public void send(K key, V value) { - send(key, value, null); - } - - public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) { - producer.send(new ProducerRecord<>(topic, key, value), callback); - } - - - private Producer<K, V> createProducer() { - // Always require producer acks to all to ensure durable writes - producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); - return new KafkaProducer<>(producerConfigs); - } - - private Consumer<K, V> createConsumer() { - // Always force reset to the beginning of the log since this class wants to consume all available log data - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new KafkaConsumer<>(consumerConfigs); - } - - private void poll(long timeoutMs) { - try { - ConsumerRecords<K, V> records = consumer.poll(timeoutMs); - for (ConsumerRecord<K, V> record : records) - consumedCallback.onCompletion(null, record); - } catch (WakeupException e) { - // Expected on get() or stop(). The calling code should handle this - throw e; - } catch (KafkaException e) { - log.error("Error polling: " + e); - } - } - - private void readToLogEnd() { - log.trace("Reading to end of offset log"); - - Set<TopicPartition> assignment = consumer.assignment(); - - // This approach to getting the current end offset is hacky until we have an API for looking these up directly - Map<TopicPartition, Long> offsets = new HashMap<>(); - for (TopicPartition tp : assignment) { - long offset = consumer.position(tp); - offsets.put(tp, offset); - consumer.seekToEnd(tp); - } - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - try { - poll(0); - } finally { - // If there is an exception, even a possibly expected one like WakeupException, we need to make sure - // the consumers position is reset or it'll get into an inconsistent state. - for (TopicPartition tp : assignment) { - long startOffset = offsets.get(tp); - long endOffset = consumer.position(tp); - if (endOffset > startOffset) { - endOffsets.put(tp, endOffset); - consumer.seek(tp, startOffset); - } - log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset); - } - } - - while (!endOffsets.isEmpty()) { - poll(Integer.MAX_VALUE); - - Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<TopicPartition, Long> entry = it.next(); - if (consumer.position(entry.getKey()) >= entry.getValue()) - it.remove(); - else - break; - } - } - } - - - private class WorkThread extends Thread { - @Override - public void run() { - try { - while (true) { - int numCallbacks; - synchronized (KafkaBasedLog.this) { - if (stopRequested) - break; - numCallbacks = readLogEndOffsetCallbacks.size(); - } - - if (numCallbacks > 0) { - try { - readToLogEnd(); - } catch (WakeupException e) { - // Either received another get() call and need to retry reading to end of log or stop() was - // called. Both are handled by restarting this loop. - continue; - } - } - - synchronized (KafkaBasedLog.this) { - // Only invoke exactly the number of callbacks we found before triggering the read to log end - // since it is possible for another write + readToEnd to sneak in in the meantime - for (int i = 0; i < numCallbacks; i++) { - Callback<Void> cb = readLogEndOffsetCallbacks.poll(); - cb.onCompletion(null, null); - } - } - - try { - poll(Integer.MAX_VALUE); - } catch (WakeupException e) { - // See previous comment, both possible causes of this wakeup are handled by starting this loop again - continue; - } - } - } catch (Throwable t) { - log.error("Unexpected exception in KafkaBasedLog's work thread", t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java deleted file mode 100644 index 3e23f29..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * <p> - * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown, - * a flag is set, which the thread should detect and try to exit gracefully from. In forcible - * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit - * gracefully, but then force it to exit if it takes too long. - * </p> - * <p> - * Implementations should override the {@link #execute} method and check {@link #getRunning} to - * determine whether they should try to gracefully exit. - * </p> - */ -public abstract class ShutdownableThread extends Thread { - private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class); - - private AtomicBoolean isRunning = new AtomicBoolean(true); - private CountDownLatch shutdownLatch = new CountDownLatch(1); - - /** - * An UncaughtExceptionHandler to register on every instance of this class. This is useful for - * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one - * instance is used for all threads, it must be thread-safe. - */ - volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null; - - public ShutdownableThread(String name) { - // The default is daemon=true so that these threads will not prevent shutdown. We use this - // default because threads that are running user code that may not clean up properly, even - // when we attempt to forcibly shut them down. - this(name, true); - } - - public ShutdownableThread(String name, boolean daemon) { - super(name); - this.setDaemon(daemon); - if (funcaughtExceptionHandler != null) - this.setUncaughtExceptionHandler(funcaughtExceptionHandler); - } - - /** - * Implementations should override this method with the main body for the thread. - */ - public abstract void execute(); - - /** - * Returns true if the thread hasn't exited yet and none of the shutdown methods have been - * invoked - */ - public boolean getRunning() { - return isRunning.get(); - } - - @Override - public void run() { - try { - execute(); - } catch (Error | RuntimeException e) { - log.error("Thread {} exiting with uncaught exception: ", getName(), e); - throw e; - } finally { - shutdownLatch.countDown(); - } - } - - /** - * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then - * forcibly interrupting the thread. - * @param gracefulTimeout the maximum time to wait for a graceful exit - * @param unit the time unit of the timeout argument - */ - public void shutdown(long gracefulTimeout, TimeUnit unit) - throws InterruptedException { - boolean success = gracefulShutdown(gracefulTimeout, unit); - if (!success) - forceShutdown(); - } - - /** - * Attempt graceful shutdown - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return true if successful, false if the timeout elapsed - */ - public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException { - startGracefulShutdown(); - return awaitShutdown(timeout, unit); - } - - /** - * Start shutting down this thread gracefully, but do not block waiting for it to exit. - */ - public void startGracefulShutdown() { - log.info("Starting graceful shutdown of thread {}", getName()); - isRunning.set(false); - } - - /** - * Awaits shutdown of this thread, waiting up to the timeout. - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return true if successful, false if the timeout elapsed - * @throws InterruptedException - */ - public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException { - return shutdownLatch.await(timeout, unit); - } - - /** - * Immediately tries to force the thread to shut down by interrupting it. This does not try to - * wait for the thread to truly exit because forcible shutdown is not always possible. By - * default, threads are marked as daemon threads so they will not prevent the process from - * exiting. - */ - public void forceShutdown() throws InterruptedException { - log.info("Forcing shutdown of thread {}", getName()); - isRunning.set(false); - interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java deleted file mode 100644 index 0458054..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.errors.RetriableException; -import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkRecord; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.storage.Converter; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.MockTime; -import org.easymock.Capture; -import org.easymock.CaptureType; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(WorkerSinkTask.class) -@PowerMockIgnore("javax.management.*") -public class WorkerSinkTaskTest { - // These are fixed to keep this code simpler. In this example we assume byte[] raw values - // with mix of integer/string in Copycat - private static final String TOPIC = "test"; - private static final int PARTITION = 12; - private static final int PARTITION2 = 13; - private static final long FIRST_OFFSET = 45; - private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; - private static final int KEY = 12; - private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; - private static final String VALUE = "VALUE"; - private static final byte[] RAW_KEY = "key".getBytes(); - private static final byte[] RAW_VALUE = "value".getBytes(); - - private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); - private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); - - private static final Map<String, String> TASK_PROPS = new HashMap<>(); - static { - TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); - } - - - private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private Time time; - private WorkerSinkTask workerTask; - @Mock - private SinkTask sinkTask; - private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); - private WorkerConfig workerConfig; - @Mock - private Converter keyConverter; - @Mock - private Converter valueConverter; - @Mock - private WorkerSinkTaskThread workerThread; - @Mock - private KafkaConsumer<byte[], byte[]> consumer; - private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); - - private long recordsReturned; - - @Before - public void setUp() { - time = new MockTime(); - Map<String, String> workerProps = new HashMap<>(); - workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); - workerConfig = new StandaloneConfig(workerProps); - workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, - taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); - - recordsReturned = 0; - } - - @Test - public void testPollRedelivery() throws Exception { - expectInitializeTask(); - - // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime - expectConsumerPoll(1); - expectConvertMessages(1); - Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(records)); - EasyMock.expectLastCall().andThrow(new RetriableException("retry")); - // Pause - EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2))); - consumer.pause(TOPIC_PARTITION); - PowerMock.expectLastCall(); - consumer.pause(TOPIC_PARTITION2); - PowerMock.expectLastCall(); - - // Retry delivery should suceed - expectConsumerPoll(0); - sinkTask.put(EasyMock.capture(records)); - EasyMock.expectLastCall(); - // And unpause - EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2))); - consumer.resume(TOPIC_PARTITION); - PowerMock.expectLastCall(); - consumer.resume(TOPIC_PARTITION2); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - workerTask.poll(Long.MAX_VALUE); - workerTask.poll(Long.MAX_VALUE); - - PowerMock.verifyAll(); - } - - - private void expectInitializeTask() throws Exception { - PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); - - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); - PowerMock.expectLastCall(); - - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - return ConsumerRecords.empty(); - } - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); - } - - private void expectConsumerPoll(final int numMessages) { - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( - new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); - for (int i = 0; i < numMessages; i++) - records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE)); - recordsReturned += numMessages; - return new ConsumerRecords<>( - numMessages > 0 ? - Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) : - Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap() - ); - } - }); - } - - private void expectConvertMessages(final int numMessages) { - EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages); - EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages); - } -}