http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java new file mode 100644 index 0000000..c6e829c --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * <p> + * OffsetStorageWriter is a buffered writer that wraps the simple OffsetBackingStore interface. + * It maintains a copy of the key-value data in memory and buffers writes. It allows you to take + * a snapshot, which can then be asynchronously flushed to the backing store while new writes + * continue to be processed. This allows Copycat to process offset commits in the background + * while continuing to process messages. + * </p> + * <p> + * Copycat uses an OffsetStorage implementation to save state about the current progress of + * source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as + * simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs + * because they can use Kafka's native offset storage (or the sink data store can handle offset + * storage to achieve exactly once semantics). + * </p> + * <p> + * Both partitions and offsets are generic data objects. This allows different connectors to use + * whatever representation they need, even arbitrarily complex records. These are translated + * internally into the serialized form the OffsetBackingStore uses. + * </p> + * <p> + * Note that this only provides write functionality. This is intentional to ensure stale data is + * never read. Offset data should only be read during startup or reconfiguration of a task. By + * always serving those requests by reading the values from the backing store, we ensure we never + * accidentally use stale data. (One example of how this can occur: a task is processing input + * partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere; + * reconfiguration causes partition A to be reassigned to this node, but now the offset data is out + * of date). Since these offsets are created and managed by the connector itself, there's no way + * for the offset management layer to know which keys are "owned" by which tasks at any given + * time. + * </p> + * <p> + * This class is not thread-safe. It should only be accessed from a Task's processing thread. + * </p> + */ +public class OffsetStorageWriter<K, V> { + private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); + + private final OffsetBackingStore backingStore; + private final Converter<K> keyConverter; + private final Converter<V> valueConverter; + private final Serializer<K> keySerializer; + private final Serializer<V> valueSerializer; + private final String namespace; + // Offset data in Copycat format + private Map<Object, Object> data = new HashMap<>(); + + // Not synchronized, should only be accessed by flush thread + private Map<Object, Object> toFlush = null; + // Unique ID for each flush request to handle callbacks after timeouts + private long currentFlushId = 0; + + public OffsetStorageWriter(OffsetBackingStore backingStore, + String namespace, Converter<K> keyConverter, Converter<V> valueConverter, + Serializer<K> keySerializer, Serializer<V> valueSerializer) { + this.backingStore = backingStore; + this.namespace = namespace; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + } + + /** + * Set an offset for a partition using Copycat data values + * @param partition the partition to store an offset for + * @param offset the offset + */ + public synchronized void setOffset(Object partition, Object offset) { + data.put(partition, offset); + } + + private boolean flushing() { + return toFlush != null; + } + + /** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. + * + * @return true if a flush was initiated, false if no data was available + */ + public synchronized boolean beginFlush() { + if (flushing()) { + log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " + + "framework should not allow this"); + throw new CopycatException("OffsetStorageWriter is already flushing"); + } + + if (data.isEmpty()) + return false; + + assert !flushing(); + toFlush = data; + data = new HashMap<>(); + return true; + } + + /** + * Flush the current offsets and clear them from this writer. This is non-blocking: it + * moves the current set of offsets out of the way, serializes the data, and asynchronously + * writes the data to the backing store. If no offsets need to be written, the callback is + * still invoked, but no Future is returned. + * + * @return a Future, or null if there are no offsets to commitOffsets + */ + public Future<Void> doFlush(final Callback<Void> callback) { + final long flushId = currentFlushId; + + // Serialize + Map<ByteBuffer, ByteBuffer> offsetsSerialized; + try { + offsetsSerialized = new HashMap<>(); + for (Map.Entry<Object, Object> entry : toFlush.entrySet()) { + byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey())); + ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; + byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue())); + ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; + offsetsSerialized.put(keyBuffer, valueBuffer); + } + } catch (Throwable t) { + // Must handle errors properly here or the writer will be left mid-flush forever and be + // unable to make progress. + log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " + + "offsets under namespace {}. This likely won't recover unless the " + + "unserializable partition or offset information is overwritten.", namespace); + callback.onCompletion(t, null); + return null; + } + + // And submit the data + log.debug("Submitting {} entries to backing store", offsetsSerialized.size()); + return backingStore.set(namespace, offsetsSerialized, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + boolean isCurrent = handleFinishWrite(flushId, error, result); + if (isCurrent && callback != null) + callback.onCompletion(error, result); + } + }); + } + + /** + * Cancel a flush that has been initiated by {@link #beginFlush}. This should not be called if + * {@link #doFlush} has already been invoked. It should be used if an operation performed + * between beginFlush and doFlush failed. + */ + public synchronized void cancelFlush() { + // Verify we're still flushing data to handle a race between cancelFlush() calls from up the + // call stack and callbacks from the write request to underlying storage + if (flushing()) { + // Just recombine the data and place it back in the primary storage + toFlush.putAll(data); + data = toFlush; + currentFlushId++; + toFlush = null; + } + } + + /** + * Handle completion of a write. Returns true if this callback is for the current flush + * operation, false if it's for an old operation that should now be ignored. + */ + private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) { + // Callbacks need to be handled carefully since the flush operation may have already timed + // out and been cancelled. + if (flushId != currentFlushId) + return false; + + if (error != null) { + cancelFlush(); + } else { + currentFlushId++; + toFlush = null; + } + return true; + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java new file mode 100644 index 0000000..5cf1423 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +/** + * Generic interface for callbacks + */ +public interface Callback<V> { + /** + * Invoked upon completion of the operation. + * + * @param error the error that caused the operation to fail, or null if no error occurred + * @param result the return value, or null if the operation failed + */ + void onCompletion(Throwable error, V result); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java new file mode 100644 index 0000000..44a9e41 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import java.io.Serializable; + +/** + * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within + * the connector. + */ +public class ConnectorTaskId implements Serializable { + private final String connector; + private final int task; + + public ConnectorTaskId(String job, int task) { + this.connector = job; + this.task = task; + } + + public String getConnector() { + return connector; + } + + public int getTask() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConnectorTaskId that = (ConnectorTaskId) o; + + if (task != that.task) + return false; + if (connector != null ? !connector.equals(that.connector) : that.connector != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = connector != null ? connector.hashCode() : 0; + result = 31 * result + task; + return result; + } + + @Override + public String toString() { + return connector + '-' + task; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java new file mode 100644 index 0000000..278fdd3 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import java.util.concurrent.*; + +public class FutureCallback<T> implements Callback<T>, Future<T> { + + private Callback<T> underlying; + private CountDownLatch finishedLatch; + private T result = null; + private Throwable exception = null; + + public FutureCallback(Callback<T> underlying) { + this.underlying = underlying; + this.finishedLatch = new CountDownLatch(1); + } + + @Override + public void onCompletion(Throwable error, T result) { + underlying.onCompletion(error, result); + this.exception = error; + this.result = result; + finishedLatch.countDown(); + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return finishedLatch.getCount() == 0; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + finishedLatch.await(); + return getResult(); + } + + @Override + public T get(long l, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + finishedLatch.await(l, timeUnit); + return getResult(); + } + + private T getResult() throws ExecutionException { + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java new file mode 100644 index 0000000..3e23f29 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p> + * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown, + * a flag is set, which the thread should detect and try to exit gracefully from. In forcible + * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit + * gracefully, but then force it to exit if it takes too long. + * </p> + * <p> + * Implementations should override the {@link #execute} method and check {@link #getRunning} to + * determine whether they should try to gracefully exit. + * </p> + */ +public abstract class ShutdownableThread extends Thread { + private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class); + + private AtomicBoolean isRunning = new AtomicBoolean(true); + private CountDownLatch shutdownLatch = new CountDownLatch(1); + + /** + * An UncaughtExceptionHandler to register on every instance of this class. This is useful for + * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one + * instance is used for all threads, it must be thread-safe. + */ + volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null; + + public ShutdownableThread(String name) { + // The default is daemon=true so that these threads will not prevent shutdown. We use this + // default because threads that are running user code that may not clean up properly, even + // when we attempt to forcibly shut them down. + this(name, true); + } + + public ShutdownableThread(String name, boolean daemon) { + super(name); + this.setDaemon(daemon); + if (funcaughtExceptionHandler != null) + this.setUncaughtExceptionHandler(funcaughtExceptionHandler); + } + + /** + * Implementations should override this method with the main body for the thread. + */ + public abstract void execute(); + + /** + * Returns true if the thread hasn't exited yet and none of the shutdown methods have been + * invoked + */ + public boolean getRunning() { + return isRunning.get(); + } + + @Override + public void run() { + try { + execute(); + } catch (Error | RuntimeException e) { + log.error("Thread {} exiting with uncaught exception: ", getName(), e); + throw e; + } finally { + shutdownLatch.countDown(); + } + } + + /** + * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then + * forcibly interrupting the thread. + * @param gracefulTimeout the maximum time to wait for a graceful exit + * @param unit the time unit of the timeout argument + */ + public void shutdown(long gracefulTimeout, TimeUnit unit) + throws InterruptedException { + boolean success = gracefulShutdown(gracefulTimeout, unit); + if (!success) + forceShutdown(); + } + + /** + * Attempt graceful shutdown + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if successful, false if the timeout elapsed + */ + public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException { + startGracefulShutdown(); + return awaitShutdown(timeout, unit); + } + + /** + * Start shutting down this thread gracefully, but do not block waiting for it to exit. + */ + public void startGracefulShutdown() { + log.info("Starting graceful shutdown of thread {}", getName()); + isRunning.set(false); + } + + /** + * Awaits shutdown of this thread, waiting up to the timeout. + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if successful, false if the timeout elapsed + * @throws InterruptedException + */ + public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException { + return shutdownLatch.await(timeout, unit); + } + + /** + * Immediately tries to force the thread to shut down by interrupting it. This does not try to + * wait for the thread to truly exit because forcible shutdown is not always possible. By + * default, threads are marked as daemon threads so they will not prevent the process from + * exiting. + */ + public void forceShutdown() throws InterruptedException { + log.info("Forcing shutdown of thread {}", getName()); + isRunning.set(false); + interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java new file mode 100644 index 0000000..0c6f950 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.sink.SinkRecord; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.sink.SinkTaskContext; +import org.apache.kafka.copycat.storage.Converter; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.MockTime; +import org.apache.kafka.copycat.util.ThreadedTest; +import org.easymock.*; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.*; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(WorkerSinkTask.class) +@PowerMockIgnore("javax.management.*") +public class WorkerSinkTaskTest extends ThreadedTest { + + // These are fixed to keep this code simpler. In this example we assume byte[] raw values + // with mix of integer/string in Copycat + private static final String TOPIC = "test"; + private static final int PARTITION = 12; + private static final long FIRST_OFFSET = 45; + private static final int KEY = 12; + private static final String VALUE = "VALUE"; + private static final byte[] RAW_KEY = "key".getBytes(); + private static final byte[] RAW_VALUE = "value".getBytes(); + + private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private Time time; + @Mock private SinkTask sinkTask; + private WorkerConfig workerConfig; + @Mock private Converter<byte[]> keyConverter; + @Mock + private Converter<byte[]> valueConverter; + private WorkerSinkTask<Integer, String> workerTask; + @Mock private KafkaConsumer<byte[], byte[]> consumer; + private WorkerSinkTaskThread workerThread; + + private long recordsReturned; + + @SuppressWarnings("unchecked") + @Override + public void setup() { + super.setup(); + time = new MockTime(); + Properties workerProps = new Properties(); + workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerConfig = new WorkerConfig(workerProps); + workerTask = PowerMock.createPartialMock( + WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); + + recordsReturned = 0; + } + + @Test + public void testPollsInBackground() throws Exception { + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L); + expectStopTask(10L); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + for (int i = 0; i < 10; i++) { + workerThread.iteration(); + } + workerTask.stop(); + // No need for awaitStop since the thread is mocked + workerTask.close(); + + // Verify contents match expected values, i.e. that they were translated properly. With max + // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches + assertEquals(10, capturedRecords.getValues().size()); + int offset = 0; + for (Collection<SinkRecord> recs : capturedRecords.getValues()) { + assertEquals(1, recs.size()); + for (SinkRecord rec : recs) { + SinkRecord referenceSinkRecord + = new SinkRecord(TOPIC, PARTITION, KEY, VALUE, FIRST_OFFSET + offset); + assertEquals(referenceSinkRecord, rec); + offset++; + } + } + + PowerMock.verifyAll(); + } + + @Test + public void testDeliverConvertsData() throws Exception { + // Validate conversion is performed when data is delivered + Integer record = 12; + + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition("topic", 0), + Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE)))); + + // Exact data doesn't matter, but should be passed directly to sink task + EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record); + EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record); + Capture<Collection<SinkRecord>> capturedRecords + = EasyMock.newCapture(CaptureType.ALL); + sinkTask.put(EasyMock.capture(capturedRecords)); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + Whitebox.invokeMethod(workerTask, "deliverMessages", records); + assertEquals(record, capturedRecords.getValue().iterator().next().getKey()); + assertEquals(record, capturedRecords.getValue().iterator().next().getValue()); + + PowerMock.verifyAll(); + } + + @Test + public void testCommit() throws Exception { + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + // Make each poll() take the offset commit interval + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, null, 0, true); + expectStopTask(2); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + // First iteration gets one record + workerThread.iteration(); + // Second triggers commit, gets a second offset + workerThread.iteration(); + // Commit finishes synchronously for testing so we can check this immediately + assertEquals(0, workerThread.getCommitFailures()); + workerTask.stop(); + workerTask.close(); + + assertEquals(2, capturedRecords.getValues().size()); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitTaskFlushFailure() throws Exception { + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, new RuntimeException(), null, 0, true); + expectStopTask(2); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + // Second iteration triggers commit + workerThread.iteration(); + workerThread.iteration(); + assertEquals(1, workerThread.getCommitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitConsumerFailure() throws Exception { + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, new Exception(), 0, true); + expectStopTask(2); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + // Second iteration triggers commit + workerThread.iteration(); + workerThread.iteration(); + // TODO Response to consistent failures? + assertEquals(1, workerThread.getCommitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitTimeout() throws Exception { + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); + expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); + expectStopTask(4); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't + // trigger another commit + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + // TODO Response to consistent failures? + assertEquals(1, workerThread.getCommitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.close(); + + PowerMock.verifyAll(); + } + + private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps) + throws Exception { + sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class)); + PowerMock.expectLastCall(); + sinkTask.start(taskProps); + PowerMock.expectLastCall(); + + PowerMock.expectPrivate(workerTask, "createConsumer", taskProps) + .andReturn(consumer); + workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"}, + workerTask, "mock-worker-thread", time, + workerConfig); + PowerMock.expectPrivate(workerTask, "createWorkerThread") + .andReturn(workerThread); + workerThread.start(); + PowerMock.expectLastCall(); + return consumer; + } + + private void expectStopTask(final long expectedMessages) throws Exception { + final long finalOffset = FIRST_OFFSET + expectedMessages - 1; + + sinkTask.stop(); + PowerMock.expectLastCall(); + + // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the + // consumer so it exits quickly + consumer.wakeup(); + PowerMock.expectLastCall(); + + consumer.close(); + PowerMock.expectLastCall(); + } + + // Note that this can only be called once per test currently + private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception { + // Stub out all the consumer stream/iterator responses, which we just want to verify occur, + // but don't care about the exact details here. + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + // "Sleep" so time will progress + time.sleep(pollDelayMs); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(TOPIC, PARTITION), + Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) + ))); + recordsReturned++; + return records; + } + }); + EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes(); + EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes(); + Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); + sinkTask.put(EasyMock.capture(capturedRecords)); + EasyMock.expectLastCall().anyTimes(); + return capturedRecords; + } + + private Capture<ConsumerCommitCallback> expectOffsetFlush(final long expectedMessages, + final RuntimeException flushError, + final Exception consumerCommitError, + final long consumerCommitDelayMs, + final boolean invokeCallback) + throws Exception { + final long finalOffset = FIRST_OFFSET + expectedMessages - 1; + + EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION)); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer( + new IAnswer<Long>() { + @Override + public Long answer() throws Throwable { + return FIRST_OFFSET + recordsReturned - 1; + } + } + ); + + sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset)); + IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall(); + if (flushError != null) { + flushExpectation.andThrow(flushError).once(); + return null; + } + + final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture(); + final Map<TopicPartition, Long> offsets = Collections.singletonMap(TOPIC_PARTITION, finalOffset); + consumer.commit(EasyMock.eq(offsets), + EasyMock.eq(CommitType.ASYNC), + EasyMock.capture(capturedCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + time.sleep(consumerCommitDelayMs); + if (invokeCallback) + capturedCallback.getValue().onComplete(offsets, consumerCommitError); + return null; + } + }); + return capturedCallback; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java new file mode 100644 index 0000000..60e1462 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTask; +import org.apache.kafka.copycat.source.SourceTaskContext; +import org.apache.kafka.copycat.storage.Converter; +import org.apache.kafka.copycat.storage.OffsetStorageReader; +import org.apache.kafka.copycat.storage.OffsetStorageWriter; +import org.apache.kafka.copycat.util.Callback; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.ThreadedTest; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +public class WorkerSourceTaskTest extends ThreadedTest { + private static final byte[] PARTITION_BYTES = "partition".getBytes(); + private static final byte[] OFFSET_BYTES = "offset-1".getBytes(); + + // Copycat-format data + private static final Integer KEY = -1; + private static final Long RECORD = 12L; + // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version + // is used in the right place. + private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes()); + private static final String CONVERTED_RECORD = "converted-record"; + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private WorkerConfig config; + @Mock private SourceTask sourceTask; + @Mock private Converter<ByteBuffer> keyConverter; + @Mock private Converter<String> valueConverter; + @Mock private KafkaProducer<ByteBuffer, String> producer; + @Mock private OffsetStorageReader offsetReader; + @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter; + private WorkerSourceTask<ByteBuffer, String> workerTask; + @Mock private Future<RecordMetadata> sendFuture; + + private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks; + + private static final Properties EMPTY_TASK_PROPS = new Properties(); + private static final List<SourceRecord> RECORDS = Arrays.asList( + new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD) + ); + + @Override + public void setup() { + super.setup(); + Properties workerProps = new Properties(); + workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + config = new WorkerConfig(workerProps); + producerCallbacks = EasyMock.newCapture(); + } + + private void createWorkerTask() { + workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer, + offsetReader, offsetWriter, config, new SystemTime()); + } + + @Test + public void testPollsInBackground() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = expectPolls(10); + // In this test, we don't flush, so nothing goes any further than the offset writer + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testCommit() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(true); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + assertTrue(workerTask.commitOffsets()); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitFailure() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(false); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + assertFalse(workerTask.commitOffsets()); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsConvertsData() throws Exception { + createWorkerTask(); + + List<SourceRecord> records = new ArrayList<>(); + // Can just use the same record for key and value + records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)); + + Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord(); + + PowerMock.replayAll(); + + Whitebox.invokeMethod(workerTask, "sendRecords", records); + assertEquals(CONVERTED_KEY, sent.getValue().key()); + assertEquals(CONVERTED_RECORD, sent.getValue().value()); + + PowerMock.verifyAll(); + } + + + private CountDownLatch expectPolls(int count) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(count); + // Note that we stub these to allow any number of calls because the thread will continue to + // run. The count passed in + latch returned just makes sure we get *at least* that number of + // calls + EasyMock.expect(sourceTask.poll()) + .andStubAnswer(new IAnswer<List<SourceRecord>>() { + @Override + public List<SourceRecord> answer() throws Throwable { + latch.countDown(); + return RECORDS; + } + }); + // Fallout of the poll() call + expectSendRecord(); + return latch; + } + + private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException { + EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY); + EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD); + + Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture(); + // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work + EasyMock.expect( + producer.send(EasyMock.capture(sent), + EasyMock.capture(producerCallbacks))) + .andStubAnswer(new IAnswer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer() throws Throwable { + synchronized (producerCallbacks) { + for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null); + } + producerCallbacks.reset(); + } + return sendFuture; + } + }); + // 2. Offset data is passed to the offset storage. + offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES); + PowerMock.expectLastCall().anyTimes(); + + return sent; + } + + private void awaitPolls(CountDownLatch latch) throws InterruptedException { + latch.await(1000, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("unchecked") + private void expectOffsetFlush(boolean succeed) throws Exception { + EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); + Future<Void> flushFuture = PowerMock.createMock(Future.class); + EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); + // Should throw for failure + IExpectationSetters<Void> futureGetExpect = EasyMock.expect( + flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); + if (succeed) { + futureGetExpect.andReturn(null); + } else { + futureGetExpect.andThrow(new TimeoutException()); + offsetWriter.cancelFlush(); + PowerMock.expectLastCall(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java new file mode 100644 index 0000000..32e7ff9 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTask; +import org.apache.kafka.copycat.storage.*; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.MockTime; +import org.apache.kafka.copycat.util.ThreadedTest; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; +import java.util.Properties; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Worker.class) +@PowerMockIgnore("javax.management.*") +public class WorkerTest extends ThreadedTest { + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private Worker worker; + private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); + private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class); + private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class); + private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class); + private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class); + + @Before + public void setup() { + super.setup(); + + Properties workerProps = new Properties(); + workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); + workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + WorkerConfig config = new WorkerConfig(workerProps); + worker = new Worker(new MockTime(), config, offsetBackingStore, + offsetKeySerializer, offsetValueSerializer, + offsetKeyDeserializer, offsetValueDeserializer); + worker.start(); + } + + @Test + public void testAddRemoveTask() throws Exception { + ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + + // Create + TestSourceTask task = PowerMock.createMock(TestSourceTask.class); + WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task); + + PowerMock.expectNew( + WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(KafkaProducer.class), + EasyMock.anyObject(OffsetStorageReader.class), + EasyMock.anyObject(OffsetStorageWriter.class), + EasyMock.anyObject(WorkerConfig.class), + EasyMock.anyObject(Time.class)) + .andReturn(workerTask); + Properties origProps = new Properties(); + workerTask.start(origProps); + EasyMock.expectLastCall(); + + // Remove + workerTask.stop(); + EasyMock.expectLastCall(); + EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); + workerTask.close(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker.addTask(taskId, TestSourceTask.class.getName(), origProps); + worker.stopTask(taskId); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + @Test(expected = CopycatException.class) + public void testStopInvalidTask() { + worker.stopTask(taskId); + } + + @Test + public void testCleanupTasksOnStop() throws Exception { + // Create + TestSourceTask task = PowerMock.createMock(TestSourceTask.class); + WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task); + + PowerMock.expectNew( + WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(KafkaProducer.class), + EasyMock.anyObject(OffsetStorageReader.class), + EasyMock.anyObject(OffsetStorageWriter.class), + EasyMock.anyObject(WorkerConfig.class), + EasyMock.anyObject(Time.class)) + .andReturn(workerTask); + Properties origProps = new Properties(); + workerTask.start(origProps); + EasyMock.expectLastCall(); + + // Remove on Worker.stop() + workerTask.stop(); + EasyMock.expectLastCall(); + EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); + // Note that in this case we *do not* commit offsets since it's an unclean shutdown + workerTask.close(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker.addTask(taskId, TestSourceTask.class.getName(), origProps); + worker.stop(); + + PowerMock.verifyAll(); + } + + + private static class TestSourceTask extends SourceTask { + public TestSourceTask() { + } + + @Override + public void start(Properties props) { + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + return null; + } + + @Override + public void stop() { + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java new file mode 100644 index 0000000..5ac7e38 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.standalone; + +import org.apache.kafka.copycat.connector.Connector; +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.runtime.ConnectorConfig; +import org.apache.kafka.copycat.runtime.Worker; +import org.apache.kafka.copycat.sink.SinkConnector; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.source.SourceConnector; +import org.apache.kafka.copycat.source.SourceTask; +import org.apache.kafka.copycat.util.Callback; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.FutureCallback; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({StandaloneHerder.class}) +@PowerMockIgnore("javax.management.*") +public class StandaloneHerderTest { + private static final String CONNECTOR_NAME = "test"; + private static final String TOPICS_LIST_STR = "topic1,topic2"; + + private StandaloneHerder herder; + @Mock protected Worker worker; + private Connector connector; + @Mock protected Callback<String> createCallback; + + private Properties connectorProps; + private Properties taskProps; + + @Before + public void setup() { + worker = PowerMock.createMock(Worker.class); + herder = new StandaloneHerder(worker); + + connectorProps = new Properties(); + connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); + connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); + PowerMock.mockStatic(StandaloneHerder.class); + + // These can be anything since connectors can pass along whatever they want. + taskProps = new Properties(); + taskProps.setProperty("foo", "bar"); + } + + @Test + public void testCreateSourceConnector() throws Exception { + connector = PowerMock.createMock(BogusSourceClass.class); + expectAdd(BogusSourceClass.class, BogusSourceTask.class, false); + PowerMock.replayAll(); + + herder.addConnector(connectorProps, createCallback); + + PowerMock.verifyAll(); + } + + @Test + public void testCreateSinkConnector() throws Exception { + connector = PowerMock.createMock(BogusSinkClass.class); + expectAdd(BogusSinkClass.class, BogusSinkTask.class, true); + + PowerMock.replayAll(); + + herder.addConnector(connectorProps, createCallback); + + PowerMock.verifyAll(); + } + + @Test + public void testDestroyConnector() throws Exception { + connector = PowerMock.createMock(BogusSourceClass.class); + expectAdd(BogusSourceClass.class, BogusSourceTask.class, false); + expectDestroy(); + PowerMock.replayAll(); + + herder.addConnector(connectorProps, createCallback); + FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + + } + }); + herder.deleteConnector(CONNECTOR_NAME, futureCb); + futureCb.get(1000L, TimeUnit.MILLISECONDS); + PowerMock.verifyAll(); + } + + + private void expectAdd(Class<? extends Connector> connClass, + Class<? extends Task> taskClass, + boolean sink) throws Exception { + expectCreate(connClass, taskClass, sink, true); + } + + private void expectRestore(Class<? extends Connector> connClass, + Class<? extends Task> taskClass) throws Exception { + // Restore never uses a callback. These tests always use sources + expectCreate(connClass, taskClass, false, false); + } + + private void expectCreate(Class<? extends Connector> connClass, + Class<? extends Task> taskClass, + boolean sink, boolean expectCallback) throws Exception { + connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); + + PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName()) + .andReturn(connector); + if (expectCallback) { + createCallback.onCompletion(null, CONNECTOR_NAME); + PowerMock.expectLastCall(); + } + + connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class)); + PowerMock.expectLastCall(); + connector.start(new Properties()); + PowerMock.expectLastCall(); + + // Just return the connector properties for the individual task we generate by default + EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass); + + EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT)) + .andReturn(Arrays.asList(taskProps)); + // And we should instantiate the tasks. For a sink task, we should see added properties for + // the input topic partitions + Properties generatedTaskProps = new Properties(); + generatedTaskProps.putAll(taskProps); + if (sink) + generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); + worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps); + PowerMock.expectLastCall(); + } + + private void expectStop() { + worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0)); + EasyMock.expectLastCall(); + connector.stop(); + EasyMock.expectLastCall(); + } + + private void expectDestroy() { + expectStop(); + } + + // We need to use a real class here due to some issue with mocking java.lang.Class + private abstract class BogusSourceClass extends SourceConnector { + } + + private abstract class BogusSourceTask extends SourceTask { + } + + private abstract class BogusSinkClass extends SinkConnector { + } + + private abstract class BogusSinkTask extends SourceTask { + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java new file mode 100644 index 0000000..bbcbdc9 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.copycat.util.Callback; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class FileOffsetBackingStoreTest { + + FileOffsetBackingStore store; + Map<String, Object> props; + File tempFile; + + private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>(); + + static { + firstSet.put(buffer("key"), buffer("value")); + firstSet.put(null, null); + } + + @Before + public void setup() throws IOException { + store = new FileOffsetBackingStore(); + tempFile = File.createTempFile("fileoffsetbackingstore", null); + props = new HashMap<>(); + props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath()); + store.configure(props); + store.start(); + } + + @After + public void teardown() { + tempFile.delete(); + } + + @Test + public void testGetSet() throws Exception { + Callback<Void> setCallback = expectSuccessfulSetCallback(); + Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback(); + PowerMock.replayAll(); + + store.set("namespace", firstSet, setCallback).get(); + + Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get(); + assertEquals(buffer("value"), values.get(buffer("key"))); + assertEquals(null, values.get(buffer("bad"))); + + PowerMock.verifyAll(); + } + + @Test + public void testSaveRestore() throws Exception { + Callback<Void> setCallback = expectSuccessfulSetCallback(); + Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback(); + PowerMock.replayAll(); + + store.set("namespace", firstSet, setCallback).get(); + store.stop(); + + // Restore into a new store to ensure correct reload from scratch + FileOffsetBackingStore restore = new FileOffsetBackingStore(); + restore.configure(props); + restore.start(); + Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get(); + assertEquals(buffer("value"), values.get(buffer("key"))); + + PowerMock.verifyAll(); + } + + private static ByteBuffer buffer(String v) { + return ByteBuffer.wrap(v.getBytes()); + } + + private Callback<Void> expectSuccessfulSetCallback() { + @SuppressWarnings("unchecked") + Callback<Void> setCallback = PowerMock.createMock(Callback.class); + setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class)); + PowerMock.expectLastCall(); + return setCallback; + } + + @SuppressWarnings("unchecked") + private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() { + Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class); + getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class)); + PowerMock.expectLastCall(); + return getCallback; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java new file mode 100644 index 0000000..3d49f05 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.util.Callback; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +public class OffsetStorageWriterTest { + private static final String NAMESPACE = "namespace"; + // Copycat format - any types should be accepted here + private static final List<String> OFFSET_KEY = Arrays.asList("key", "key"); + private static final String OFFSET_VALUE = "value"; + // Native objects - must match serializer types + private static final int OFFSET_KEY_CONVERTED = 12; + private static final String OFFSET_VALUE_CONVERTED = "value-converted"; + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED + = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED), + ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); + + @Mock private OffsetBackingStore store; + @Mock private Converter<Integer> keyConverter; + @Mock private Converter<String> valueConverter; + @Mock private Serializer<Integer> keySerializer; + @Mock private Serializer<String> valueSerializer; + private OffsetStorageWriter<Integer, String> writer; + + private static Exception exception = new RuntimeException("error"); + + private ExecutorService service; + + @Before + public void setup() { + writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer); + service = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() { + service.shutdownNow(); + } + + @Test + public void testWriteFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(callback, false); + + PowerMock.replayAll(); + + writer.setOffset(OFFSET_KEY, OFFSET_VALUE); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testNoOffsetsToFlush() { + // If no offsets are flushed, we should finish immediately and not have made any calls to the + // underlying storage layer + + PowerMock.replayAll(); + + // Should not return a future + assertFalse(writer.beginFlush()); + + PowerMock.verifyAll(); + } + + @Test + public void testFlushFailureReplacesOffsets() throws Exception { + // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored + // such that a subsequent flush will write them. + + @SuppressWarnings("unchecked") + final Callback<Void> callback = PowerMock.createMock(Callback.class); + // First time the write fails + expectStore(callback, true); + // Second time it succeeds + expectStore(callback, false); + // Third time it has no data to flush so we won't get past beginFlush() + + PowerMock.replayAll(); + + writer.setOffset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + assertFalse(writer.beginFlush()); + + PowerMock.verifyAll(); + } + + @Test(expected = CopycatException.class) + public void testAlreadyFlushing() throws Exception { + @SuppressWarnings("unchecked") + final Callback<Void> callback = PowerMock.createMock(Callback.class); + // Trigger the send, but don't invoke the callback so we'll still be mid-flush + CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); + expectStore(null, false, allowStoreCompleteCountdown); + + PowerMock.replayAll(); + + writer.setOffset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.doFlush(callback); + assertTrue(writer.beginFlush()); // should throw + + PowerMock.verifyAll(); + } + + @Test + public void testCancelBeforeAwaitFlush() { + PowerMock.replayAll(); + + writer.setOffset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.cancelFlush(); + + PowerMock.verifyAll(); + } + + @Test + public void testCancelAfterAwaitFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); + // In this test, the write should be cancelled so the callback will not be invoked and is not + // passed to the expectStore call + expectStore(null, false, allowStoreCompleteCountdown); + + PowerMock.replayAll(); + + writer.setOffset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + // Start the flush, then immediately cancel before allowing the mocked store request to finish + Future<Void> flushFuture = writer.doFlush(callback); + writer.cancelFlush(); + allowStoreCompleteCountdown.countDown(); + flushFuture.get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + private void expectStore(final Callback<Void> callback, final boolean fail) { + expectStore(callback, fail, null); + } + + /** + * Expect a request to store data to the underlying OffsetBackingStore. + * + * @param callback the callback to invoke when completed, or null if the callback isn't + * expected to be invoked + * @param fail if true, treat + * @param waitForCompletion if non-null, a CountDownLatch that should be awaited on before + * invoking the callback. A (generous) timeout is still imposed to + * ensure tests complete. + * @return the captured set of ByteBuffer key-value pairs passed to the storage layer + */ + private void expectStore(final Callback<Void> callback, + final boolean fail, + final CountDownLatch waitForCompletion) { + EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED); + EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); + EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED); + EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); + + final Capture<Callback<Void>> storeCallback = Capture.newInstance(); + EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED), + EasyMock.capture(storeCallback))) + .andAnswer(new IAnswer<Future<Void>>() { + @Override + public Future<Void> answer() throws Throwable { + return service.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (waitForCompletion != null) + assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS)); + + if (fail) { + storeCallback.getValue().onCompletion(exception, null); + } else { + storeCallback.getValue().onCompletion(null, null); + } + return null; + } + }); + } + }); + if (callback != null) { + if (fail) { + callback.onCompletion(EasyMock.eq(exception), EasyMock.eq((Void) null)); + } else { + callback.onCompletion(null, null); + } + } + PowerMock.expectLastCall(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java new file mode 100644 index 0000000..53149db --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.util; + +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.TimeUnit; + +/** + * A clock that you can manually advance by calling sleep + */ +public class MockTime implements Time { + + private long nanos = 0; + + public MockTime() { + this.nanos = System.nanoTime(); + } + + @Override + public long milliseconds() { + return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); + } + + @Override + public long nanoseconds() { + return nanos; + } + + @Override + public void sleep(long ms) { + this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java new file mode 100644 index 0000000..4880ca1 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ShutdownableThreadTest { + + @Test + public void testGracefulShutdown() throws InterruptedException { + ShutdownableThread thread = new ShutdownableThread("graceful") { + @Override + public void execute() { + while (getRunning()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // Ignore + } + } + } + }; + thread.start(); + Thread.sleep(10); + assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS)); + } + + @Test + public void testForcibleShutdown() throws InterruptedException { + final CountDownLatch startedLatch = new CountDownLatch(1); + ShutdownableThread thread = new ShutdownableThread("forcible") { + @Override + public void execute() { + try { + startedLatch.countDown(); + Thread.sleep(100000); + } catch (InterruptedException e) { + // Ignore + } + } + }; + thread.start(); + startedLatch.await(); + thread.forceShutdown(); + // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in + // certain conditions, but in this case we know the thread is interruptible so we should be + // able join() it + thread.join(1000); + assertFalse(thread.isAlive()); + } +}