http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java new file mode 100644 index 0000000..4e54bf1 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaOffsetBackingStore.class) +@PowerMockIgnore("javax.management.*") +public class KafkaOffsetBackingStoreTest { + private static final String TOPIC = "connect-offsets"; + private static final Map<String, String> DEFAULT_PROPS = new HashMap<>(); + static { + DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC); + DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); + } + private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>(); + static { + FIRST_SET.put(buffer("key"), buffer("value")); + FIRST_SET.put(null, null); + } + + private static final ByteBuffer TP0_KEY = buffer("TP0KEY"); + private static final ByteBuffer TP1_KEY = buffer("TP1KEY"); + private static final ByteBuffer TP2_KEY = buffer("TP2KEY"); + private static final ByteBuffer TP0_VALUE = buffer("VAL0"); + private static final ByteBuffer TP1_VALUE = buffer("VAL1"); + private static final ByteBuffer TP2_VALUE = buffer("VAL2"); + private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW"); + private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW"); + + @Mock + KafkaBasedLog<byte[], byte[]> storeLog; + private KafkaOffsetBackingStore store; + + private Capture<String> capturedTopic = EasyMock.newCapture(); + private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); + private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); + private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture(); + + @Before + public void setUp() throws Exception { + store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"}); + } + + @Test(expected = ConnectException.class) + public void testMissingTopic() { + store = new KafkaOffsetBackingStore(); + store.configure(Collections.<String, Object>emptyMap()); + } + + @Test + public void testStartStop() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST); + expectStop(); + + PowerMock.replayAll(); + + store.configure(DEFAULT_PROPS); + assertEquals(TOPIC, capturedTopic.getValue()); + assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + + store.start(); + store.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testReloadOnStart() throws Exception { + expectConfigure(); + expectStart(Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()), + new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()), + new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()), + new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()) + )); + expectStop(); + + PowerMock.replayAll(); + + store.configure(DEFAULT_PROPS); + store.start(); + HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data"); + assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY)); + assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY)); + + store.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testGetSet() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST); + expectStop(); + + // First get() against an empty store + final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture(); + storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + firstGetReadToEndCallback.getValue().onCompletion(null, null); + return null; + } + }); + + // Set offsets + Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); + PowerMock.expectLastCall(); + Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); + PowerMock.expectLastCall(); + + // Second get() should get the produced data and return the new values + final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture(); + storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array())); + secondGetReadToEndCallback.getValue().onCompletion(null, null); + return null; + } + }); + + // Third get() should pick up data produced by someone else and return those values + final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture(); + storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())); + thirdGetReadToEndCallback.getValue().onCompletion(null, null); + return null; + } + }); + + PowerMock.replayAll(); + + + + store.configure(DEFAULT_PROPS); + store.start(); + + // Getting from empty store should return nulls + final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false); + store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { + @Override + public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { + // Since we didn't read them yet, these will be null + assertEquals(null, result.get(TP0_KEY)); + assertEquals(null, result.get(TP1_KEY)); + getInvokedAndPassed.set(true); + } + }).get(10000, TimeUnit.MILLISECONDS); + assertTrue(getInvokedAndPassed.get()); + + // Set some offsets + Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>(); + toSet.put(TP0_KEY, TP0_VALUE); + toSet.put(TP1_KEY, TP1_VALUE); + final AtomicBoolean invoked = new AtomicBoolean(false); + Future<Void> setFuture = store.set(toSet, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + invoked.set(true); + } + }); + assertFalse(setFuture.isDone()); + // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback + // for the store's set callback + callback1.getValue().onCompletion(null, null); + assertFalse(invoked.get()); + callback0.getValue().onCompletion(null, null); + setFuture.get(10000, TimeUnit.MILLISECONDS); + assertTrue(invoked.get()); + + // Getting data should read to end of our published data and return it + final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false); + store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { + @Override + public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { + assertEquals(TP0_VALUE, result.get(TP0_KEY)); + assertEquals(TP1_VALUE, result.get(TP1_KEY)); + secondGetInvokedAndPassed.set(true); + } + }).get(10000, TimeUnit.MILLISECONDS); + assertTrue(secondGetInvokedAndPassed.get()); + + // Getting data should read to end of our published data and return it + final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false); + store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { + @Override + public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { + assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY)); + assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY)); + thirdGetInvokedAndPassed.set(true); + } + }).get(10000, TimeUnit.MILLISECONDS); + assertTrue(thirdGetInvokedAndPassed.get()); + + store.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testSetFailure() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST); + expectStop(); + + // Set offsets + Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); + PowerMock.expectLastCall(); + Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); + PowerMock.expectLastCall(); + Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture(); + storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2)); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + + + store.configure(DEFAULT_PROPS); + store.start(); + + // Set some offsets + Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>(); + toSet.put(TP0_KEY, TP0_VALUE); + toSet.put(TP1_KEY, TP1_VALUE); + toSet.put(TP2_KEY, TP2_VALUE); + final AtomicBoolean invoked = new AtomicBoolean(false); + final AtomicBoolean invokedFailure = new AtomicBoolean(false); + Future<Void> setFuture = store.set(toSet, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + invoked.set(true); + if (error != null) + invokedFailure.set(true); + } + }); + assertFalse(setFuture.isDone()); + // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback + // for the store's set callback + callback1.getValue().onCompletion(null, null); + assertFalse(invoked.get()); + callback2.getValue().onCompletion(null, new KafkaException("bogus error")); + assertTrue(invoked.get()); + assertTrue(invokedFailure.get()); + callback0.getValue().onCompletion(null, null); + try { + setFuture.get(10000, TimeUnit.MILLISECONDS); + fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future"); + } catch (ExecutionException e) { + // expected + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof KafkaException); + } + + store.stop(); + + PowerMock.verifyAll(); + } + + private void expectConfigure() throws Exception { + PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), + EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback)) + .andReturn(storeLog); + } + + private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) throws Exception { + storeLog.start(); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords) + capturedConsumedCallback.getValue().onCompletion(null, rec); + return null; + } + }); + } + + private void expectStop() { + storeLog.stop(); + PowerMock.expectLastCall(); + } + + private static ByteBuffer buffer(String v) { + return ByteBuffer.wrap(v.getBytes()); + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java new file mode 100644 index 0000000..3e0347c --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.Callback; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +public class OffsetStorageWriterTest { + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + @Mock private OffsetBackingStore store; + @Mock private Converter keyConverter; + @Mock private Converter valueConverter; + private OffsetStorageWriter writer; + + private static Exception exception = new RuntimeException("error"); + + private ExecutorService service; + + @Before + public void setup() { + writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter); + service = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() { + service.shutdownNow(); + } + + @Test + public void testWriteFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, OFFSET_VALUE); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + // It should be possible to set offset values to null + @Test + public void testWriteNullValueFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, false, null); + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, null); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + // It should be possible to use null keys. These aren't actually stored as null since the key is wrapped to include + // info about the namespace (connector) + @Test + public void testWriteNullKeyFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); + + PowerMock.replayAll(); + + writer.offset(null, OFFSET_VALUE); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testNoOffsetsToFlush() { + // If no offsets are flushed, we should finish immediately and not have made any calls to the + // underlying storage layer + + PowerMock.replayAll(); + + // Should not return a future + assertFalse(writer.beginFlush()); + + PowerMock.verifyAll(); + } + + @Test + public void testFlushFailureReplacesOffsets() throws Exception { + // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored + // such that a subsequent flush will write them. + + @SuppressWarnings("unchecked") + final Callback<Void> callback = PowerMock.createMock(Callback.class); + // First time the write fails + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null); + // Second time it succeeds + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); + // Third time it has no data to flush so we won't get past beginFlush() + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + assertFalse(writer.beginFlush()); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testAlreadyFlushing() throws Exception { + @SuppressWarnings("unchecked") + final Callback<Void> callback = PowerMock.createMock(Callback.class); + // Trigger the send, but don't invoke the callback so we'll still be mid-flush + CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.doFlush(callback); + assertTrue(writer.beginFlush()); // should throw + + PowerMock.verifyAll(); + } + + @Test + public void testCancelBeforeAwaitFlush() { + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + writer.cancelFlush(); + + PowerMock.verifyAll(); + } + + @Test + public void testCancelAfterAwaitFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); + // In this test, the write should be cancelled so the callback will not be invoked and is not + // passed to the expectStore call + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(writer.beginFlush()); + // Start the flush, then immediately cancel before allowing the mocked store request to finish + Future<Void> flushFuture = writer.doFlush(callback); + writer.cancelFlush(); + allowStoreCompleteCountdown.countDown(); + flushFuture.get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + /** + * Expect a request to store data to the underlying OffsetBackingStore. + * + * @param key the key for the offset + * @param keySerialized serialized version of the key + * @param value the value for the offset + * @param valueSerialized serialized version of the value + * @param callback the callback to invoke when completed, or null if the callback isn't + * expected to be invoked + * @param fail if true, treat + * @param waitForCompletion if non-null, a CountDownLatch that should be awaited on before + * invoking the callback. A (generous) timeout is still imposed to + * ensure tests complete. + * @return the captured set of ByteBuffer key-value pairs passed to the storage layer + */ + private void expectStore(Map<String, String> key, byte[] keySerialized, + Map<String, Integer> value, byte[] valueSerialized, + final Callback<Void> callback, + final boolean fail, + final CountDownLatch waitForCompletion) { + List<Object> keyWrapped = Arrays.asList(NAMESPACE, key); + EasyMock.expect(keyConverter.fromConnectData(NAMESPACE, null, keyWrapped)).andReturn(keySerialized); + EasyMock.expect(valueConverter.fromConnectData(NAMESPACE, null, value)).andReturn(valueSerialized); + + final Capture<Callback<Void>> storeCallback = Capture.newInstance(); + final Map<ByteBuffer, ByteBuffer> offsetsSerialized = Collections.singletonMap( + keySerialized == null ? null : ByteBuffer.wrap(keySerialized), + valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized)); + EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), EasyMock.capture(storeCallback))) + .andAnswer(new IAnswer<Future<Void>>() { + @Override + public Future<Void> answer() throws Throwable { + return service.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (waitForCompletion != null) + assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS)); + + if (fail) { + storeCallback.getValue().onCompletion(exception, null); + } else { + storeCallback.getValue().onCompletion(null, null); + } + return null; + } + }); + } + }); + if (callback != null) { + if (fail) { + callback.onCompletion(EasyMock.eq(exception), EasyMock.eq((Void) null)); + } else { + callback.onCompletion(null, null); + } + } + PowerMock.expectLastCall(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java new file mode 100644 index 0000000..4d17ac4 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.easymock.EasyMock; +import org.easymock.IArgumentMatcher; + +import java.util.Arrays; + +public class ByteArrayProducerRecordEquals implements IArgumentMatcher { + private ProducerRecord<byte[], byte[]> record; + + public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[], byte[]> in) { + EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in)); + return null; + } + + public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) { + this.record = record; + } + + @Override + public boolean matches(Object argument) { + if (!(argument instanceof ProducerRecord)) + return false; + ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>) argument; + return record.topic().equals(other.topic()) && + record.partition() != null ? record.partition().equals(other.partition()) : other.partition() == null && + record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key() == null && + record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value() == null; + } + + @Override + public void appendTo(StringBuffer buffer) { + buffer.append(record.toString()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java new file mode 100644 index 0000000..1c3b842 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaBasedLog.class) +@PowerMockIgnore("javax.management.*") +public class KafkaBasedLogTest { + + private static final String TOPIC = "connect-log"; + private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0); + private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1); + private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>(); + static { + PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); + PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + } + private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>(); + static { + CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); + CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + } + + private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1)); + private static final Map<String, String> FIRST_SET = new HashMap<>(); + static { + FIRST_SET.put("key", "value"); + FIRST_SET.put(null, null); + } + + private static final Node LEADER = new Node(1, "broker1", 9092); + private static final Node REPLICA = new Node(1, "broker2", 9093); + + private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA}); + private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA}); + + private static final String TP0_KEY = "TP0KEY"; + private static final String TP1_KEY = "TP1KEY"; + private static final String TP0_VALUE = "VAL0"; + private static final String TP1_VALUE = "VAL1"; + private static final String TP0_VALUE_NEW = "VAL0_NEW"; + private static final String TP1_VALUE_NEW = "VAL1_NEW"; + + private Time time = new MockTime(); + private KafkaBasedLog<String, String> store; + + @Mock + private KafkaProducer<String, String> producer; + private MockConsumer<String, String> consumer; + + private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>(); + private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord<String, String> record) { + consumedRecords.add(record); + } + }; + + @Before + public void setUp() throws Exception { + store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, + TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); + Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(TP0, 0L); + beginningOffsets.put(TP1, 0L); + consumer.updateBeginningOffsets(beginningOffsets); + } + + @Test + public void testStartStop() throws Exception { + expectStart(); + expectStop(); + + PowerMock.replayAll(); + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + consumer.updateEndOffsets(endOffsets); + store.start(); + assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); + + store.stop(); + + assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertTrue(consumer.closed()); + PowerMock.verifyAll(); + } + + @Test + public void testReloadOnStart() throws Exception { + expectStart(); + expectStop(); + + PowerMock.replayAll(); + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(TP0, 1L); + endOffsets.put(TP1, 1L); + consumer.updateEndOffsets(endOffsets); + final CountDownLatch finishedLatch = new CountDownLatch(1); + consumer.schedulePollTask(new Runnable() { // Use first poll task to setup sequence of remaining responses to polls + @Override + public void run() { + // Should keep polling until it reaches current log end offset for all partitions. Should handle + // as many empty polls as needed + consumer.scheduleNopPollTask(); + consumer.scheduleNopPollTask(); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE)); + } + }); + consumer.scheduleNopPollTask(); + consumer.scheduleNopPollTask(); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE)); + } + }); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + finishedLatch.countDown(); + } + }); + } + }); + store.start(); + assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); + + assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); + assertEquals(2, consumedRecords.size()); + assertEquals(TP0_VALUE, consumedRecords.get(0).value()); + assertEquals(TP1_VALUE, consumedRecords.get(1).value()); + + store.stop(); + + assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertTrue(consumer.closed()); + PowerMock.verifyAll(); + } + + @Test + public void testSendAndReadToEnd() throws Exception { + expectStart(); + TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); + ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); + Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); + EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + TestFuture<RecordMetadata> tp1Future = new TestFuture<>(); + ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); + Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); + EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); + + // Producer flushes when read to log end is called + producer.flush(); + PowerMock.expectLastCall(); + + expectStop(); + + PowerMock.replayAll(); + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + consumer.updateEndOffsets(endOffsets); + store.start(); + assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); + assertEquals(0L, consumer.position(TP0)); + assertEquals(0L, consumer.position(TP1)); + + // Set some keys + final AtomicInteger invoked = new AtomicInteger(0); + org.apache.kafka.clients.producer.Callback producerCallback = new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + invoked.incrementAndGet(); + } + }; + store.send(TP0_KEY, TP0_VALUE, producerCallback); + store.send(TP1_KEY, TP1_VALUE, producerCallback); + assertEquals(0, invoked.get()); + tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing + callback1.getValue().onCompletion(null, null); + assertEquals(1, invoked.get()); + tp0Future.resolve((RecordMetadata) null); + callback0.getValue().onCompletion(null, null); + assertEquals(2, invoked.get()); + + // Now we should have to wait for the records to be read back when we call readToEnd() + final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false); + final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + assertEquals(4, consumedRecords.size()); + assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value()); + assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value()); + getInvokedAndPassed.set(true); + } + }); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events + // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without + // returning any data. + store.readToEnd(readEndFutureCallback); + + // Needs to seek to end to find end offsets + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + Map<TopicPartition, Long> newEndOffsets = new HashMap<>(); + newEndOffsets.put(TP0, 2L); + newEndOffsets.put(TP1, 2L); + consumer.updateEndOffsets(newEndOffsets); + } + }); + + // Should keep polling until it reaches current log end offset for all partitions + consumer.scheduleNopPollTask(); + consumer.scheduleNopPollTask(); + consumer.scheduleNopPollTask(); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE)); + } + }); + + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW)); + } + }); + + // Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch + } + }); + readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS); + assertTrue(getInvokedAndPassed.get()); + + // Cleanup + store.stop(); + + assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertTrue(consumer.closed()); + PowerMock.verifyAll(); + } + + @Test + public void testConsumerError() throws Exception { + expectStart(); + expectStop(); + + PowerMock.replayAll(); + + final CountDownLatch finishedLatch = new CountDownLatch(1); + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(TP0, 1L); + endOffsets.put(TP1, 1L); + consumer.updateEndOffsets(endOffsets); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + // Trigger exception + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception()); + } + }); + + // Should keep polling until it reaches current log end offset for all partitions + consumer.scheduleNopPollTask(); + consumer.scheduleNopPollTask(); + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW)); + consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW)); + } + }); + + consumer.schedulePollTask(new Runnable() { + @Override + public void run() { + finishedLatch.countDown(); + } + }); + } + }); + store.start(); + assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); + assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); + assertEquals(1L, consumer.position(TP0)); + + store.stop(); + + assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertTrue(consumer.closed()); + PowerMock.verifyAll(); + } + + @Test + public void testProducerError() throws Exception { + expectStart(); + TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); + ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); + Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); + EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + + expectStop(); + + PowerMock.replayAll(); + + Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + consumer.updateEndOffsets(endOffsets); + store.start(); + assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); + assertEquals(0L, consumer.position(TP0)); + assertEquals(0L, consumer.position(TP1)); + + final AtomicReference<Throwable> setException = new AtomicReference<>(); + store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertNull(setException.get()); // Should only be invoked once + setException.set(exception); + } + }); + KafkaException exc = new LeaderNotAvailableException("Error"); + tp0Future.resolve(exc); + callback0.getValue().onCompletion(null, exc); + assertNotNull(setException.get()); + + store.stop(); + + assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertTrue(consumer.closed()); + PowerMock.verifyAll(); + } + + + private void expectStart() throws Exception { + PowerMock.expectPrivate(store, "createProducer") + .andReturn(producer); + PowerMock.expectPrivate(store, "createConsumer") + .andReturn(consumer); + } + + private void expectStop() { + producer.close(); + PowerMock.expectLastCall(); + // MockConsumer close is checked after test. + } + + private static ByteBuffer buffer(String v) { + return ByteBuffer.wrap(v.getBytes()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java new file mode 100644 index 0000000..85f6895 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.TimeUnit; + +/** + * A clock that you can manually advance by calling sleep + */ +public class MockTime implements Time { + + private long nanos = 0; + + public MockTime() { + this.nanos = System.nanoTime(); + } + + @Override + public long milliseconds() { + return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); + } + + @Override + public long nanoseconds() { + return nanos; + } + + @Override + public void sleep(long ms) { + this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java new file mode 100644 index 0000000..5c3c224 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ShutdownableThreadTest { + + @Test + public void testGracefulShutdown() throws InterruptedException { + ShutdownableThread thread = new ShutdownableThread("graceful") { + @Override + public void execute() { + while (getRunning()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // Ignore + } + } + } + }; + thread.start(); + Thread.sleep(10); + assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS)); + } + + @Test + public void testForcibleShutdown() throws InterruptedException { + final CountDownLatch startedLatch = new CountDownLatch(1); + ShutdownableThread thread = new ShutdownableThread("forcible") { + @Override + public void execute() { + try { + startedLatch.countDown(); + Thread.sleep(100000); + } catch (InterruptedException e) { + // Ignore + } + } + }; + thread.start(); + startedLatch.await(); + thread.forceShutdown(); + // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in + // certain conditions, but in this case we know the thread is interruptible so we should be + // able join() it + thread.join(1000); + assertFalse(thread.isAlive()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java new file mode 100644 index 0000000..12bac98 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +/** + * An UncaughtExceptionHandler that can be registered with one or more threads which tracks the + * first exception so the main thread can check for uncaught exceptions. + */ +public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExceptionHandler { + private Throwable firstException = null; + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (this.firstException == null) + this.firstException = e; + } + + public void verifyNoExceptions() { + if (this.firstException != null) + throw new AssertionError(this.firstException); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java new file mode 100644 index 0000000..3683f91 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class TestFuture<T> implements Future<T> { + private volatile boolean resolved; + private T result; + private Throwable exception; + private CountDownLatch getCalledLatch; + + private volatile boolean resolveOnGet; + private T resolveOnGetResult; + private Throwable resolveOnGetException; + + public TestFuture() { + resolved = false; + getCalledLatch = new CountDownLatch(1); + + resolveOnGet = false; + resolveOnGetResult = null; + resolveOnGetException = null; + } + + public void resolve(T val) { + this.result = val; + resolved = true; + synchronized (this) { + this.notifyAll(); + } + } + + public void resolve(Throwable t) { + exception = t; + resolved = true; + synchronized (this) { + this.notifyAll(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return resolved; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + getCalledLatch.countDown(); + while (true) { + try { + return get(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (TimeoutException e) { + // ignore + } + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + getCalledLatch.countDown(); + + if (resolveOnGet) { + if (resolveOnGetException != null) + resolve(resolveOnGetException); + else + resolve(resolveOnGetResult); + } + + synchronized (this) { + while (!resolved) { + this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); + } + } + + if (exception != null) { + if (exception instanceof TimeoutException) + throw (TimeoutException) exception; + else if (exception instanceof InterruptedException) + throw (InterruptedException) exception; + else + throw new ExecutionException(exception); + } + return result; + } + + /** + * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately. + * @param val the value to return from the future + */ + public void resolveOnGet(T val) { + resolveOnGet = true; + resolveOnGetResult = val; + } + + /** + * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately. + * @param t the exception to return from the future + */ + public void resolveOnGet(Throwable t) { + resolveOnGet = true; + resolveOnGetException = t; + } + + /** + * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with + * the specified value. + * @param val the value to return from the future + */ + public void waitForGetAndResolve(T val) { + waitForGet(); + resolve(val); + } + + /** + * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with + * the specified value. + * @param t the exception to use to resolve the future + */ + public void waitForGetAndResolve(Throwable t) { + waitForGet(); + resolve(t); + } + + private void waitForGet() { + try { + getCalledLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException("Unexpected interruption: ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java new file mode 100644 index 0000000..0241ea3 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.junit.After; +import org.junit.Before; + +/** + * Base class for tests that use threads. It sets up uncaught exception handlers for all known + * thread classes and checks for errors at the end of the test so that failures in background + * threads will cause the test to fail. + */ +public class ThreadedTest { + + protected TestBackgroundThreadExceptionHandler backgroundThreadExceptionHandler; + + @Before + public void setup() { + backgroundThreadExceptionHandler = new TestBackgroundThreadExceptionHandler(); + ShutdownableThread.funcaughtExceptionHandler = backgroundThreadExceptionHandler; + } + + @After + public void teardown() { + backgroundThreadExceptionHandler.verifyNoExceptions(); + ShutdownableThread.funcaughtExceptionHandler = null; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties new file mode 100644 index 0000000..d5e90fe --- /dev/null +++ b/connect/runtime/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## +log4j.rootLogger=OFF, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.kafka=ERROR http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java deleted file mode 100644 index 4d0e1bd..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.connector; - -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.List; -import java.util.Map; - -/** - * <p> - * Connectors manage integration of Copycat with another system, either as an input that ingests - * data into Kafka or an output that passes data to an external system. Implementations should - * not use this class directly; they should inherit from SourceConnector or SinkConnector. - * </p> - * <p> - * Connectors have two primary tasks. First, given some configuration, they are responsible for - * creating configurations for a set of {@link Task}s that split up the data processing. For - * example, a database Connector might create Tasks by dividing the set of tables evenly among - * tasks. Second, they are responsible for monitoring inputs for changes that require - * reconfiguration and notifying the Copycat runtime via the ConnectorContext. Continuing the - * previous example, the connector might periodically check for new tables and notify Copycat of - * additions and deletions. Copycat will then request new configurations and update the running - * Tasks. - * </p> - */ -@InterfaceStability.Unstable -public abstract class Connector { - - protected ConnectorContext context; - - /** - * Get the version of this connector. - * - * @return the version, formatted as a String - */ - public abstract String version(); - - /** - * Initialize this connector, using the provided ConnectorContext to notify the runtime of - * input configuration changes. - * @param ctx context object used to interact with the Copycat runtime - */ - public void initialize(ConnectorContext ctx) { - context = ctx; - } - - /** - * <p> - * Initialize this connector, using the provided ConnectorContext to notify the runtime of - * input configuration changes and using the provided set of Task configurations. - * This version is only used to recover from failures. - * </p> - * <p> - * The default implementation ignores the provided Task configurations. During recovery, Copycat will request - * an updated set of configurations and update the running Tasks appropriately. However, Connectors should - * implement special handling of this case if it will avoid unnecessary changes to running Tasks. - * </p> - * - * @param ctx context object used to interact with the Copycat runtime - * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid - * churn in partition to task assignments - */ - public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) { - context = ctx; - // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs - // are very different, but reduces the difficulty of implementing a Connector - } - - /** - * Start this Connector. This method will only be called on a clean Connector, i.e. it has - * either just been instantiated and initialized or {@link #stop()} has been invoked. - * - * @param props configuration settings - */ - public abstract void start(Map<String, String> props); - - /** - * Reconfigure this Connector. Most implementations will not override this, using the default - * implementation that calls {@link #stop()} followed by {@link #start(Map)}. - * Implementations only need to override this if they want to handle this process more - * efficiently, e.g. without shutting down network connections to the external system. - * - * @param props new configuration settings - */ - public void reconfigure(Map<String, String> props) { - stop(); - start(props); - } - - /** - * Returns the Task implementation for this Connector. - */ - public abstract Class<? extends Task> taskClass(); - - /** - * Returns a set of configurations for Tasks based on the current configuration, - * producing at most count configurations. - * - * @param maxTasks maximum number of configurations to generate - * @return configurations for Tasks - */ - public abstract List<Map<String, String>> taskConfigs(int maxTasks); - - /** - * Stop this connector. - */ - public abstract void stop(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java deleted file mode 100644 index ecba69a..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.connector; - -import org.apache.kafka.common.annotation.InterfaceStability; - -/** - * ConnectorContext allows Connectors to proactively interact with the Copycat runtime. - */ -@InterfaceStability.Unstable -public interface ConnectorContext { - /** - * Requests that the runtime reconfigure the Tasks for this source. This should be used to - * indicate to the runtime that something about the input/output has changed (e.g. partitions - * added/removed) and the running Tasks will need to be modified. - */ - void requestTaskReconfiguration(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java deleted file mode 100644 index 0d3e8dc..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.connector; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.data.Schema; - -/** - * <p> - * Base class for records containing data to be copied to/from Kafka. This corresponds closely to - * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both - * sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a - * notion of offset, it is not included here because they differ in type. - * </p> - */ -@InterfaceStability.Unstable -public abstract class CopycatRecord { - private final String topic; - private final Integer kafkaPartition; - private final Schema keySchema; - private final Object key; - private final Schema valueSchema; - private final Object value; - - public CopycatRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) { - this(topic, kafkaPartition, null, null, valueSchema, value); - } - - public CopycatRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) { - this.topic = topic; - this.kafkaPartition = kafkaPartition; - this.keySchema = keySchema; - this.key = key; - this.valueSchema = valueSchema; - this.value = value; - } - - public String topic() { - return topic; - } - - public Integer kafkaPartition() { - return kafkaPartition; - } - - public Object key() { - return key; - } - - public Schema keySchema() { - return keySchema; - } - - public Object value() { - return value; - } - - public Schema valueSchema() { - return valueSchema; - } - - @Override - public String toString() { - return "CopycatRecord{" + - "topic='" + topic + '\'' + - ", kafkaPartition=" + kafkaPartition + - ", key=" + key + - ", value=" + value + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - CopycatRecord that = (CopycatRecord) o; - - if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null) - return false; - if (topic != null ? !topic.equals(that.topic) : that.topic != null) - return false; - if (keySchema != null ? !keySchema.equals(that.keySchema) : that.keySchema != null) - return false; - if (key != null ? !key.equals(that.key) : that.key != null) - return false; - if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : that.valueSchema != null) - return false; - if (value != null ? !value.equals(that.value) : that.value != null) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = topic != null ? topic.hashCode() : 0; - result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0); - result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0); - result = 31 * result + (key != null ? key.hashCode() : 0); - result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0); - result = 31 * result + (value != null ? value.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java deleted file mode 100644 index cb8b719..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.connector; - -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Map; - -/** - * <p> - * Tasks contain the code that actually copies data to/from another system. They receive - * a configuration from their parent Connector, assigning them a fraction of a Copycat job's work. - * The Copycat framework then pushes/pulls data from the Task. The Task must also be able to - * respond to reconfiguration requests. - * </p> - * <p> - * Task only contains the minimal shared functionality between - * {@link org.apache.kafka.copycat.source.SourceTask} and - * {@link org.apache.kafka.copycat.sink.SinkTask}. - * </p> - */ -@InterfaceStability.Unstable -public interface Task { - /** - * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version. - * - * @return the version, formatted as a String - */ - String version(); - - /** - * Start the Task - * @param props initial configuration - */ - void start(Map<String, String> props); - - /** - * Stop this task. - */ - void stop(); -}