http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java deleted file mode 100644 index 69d9ab4..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java +++ /dev/null @@ -1,357 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.KafkaBasedLog; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaOffsetBackingStore.class) -@PowerMockIgnore("javax.management.*") -public class KafkaOffsetBackingStoreTest { - private static final String TOPIC = "copycat-offsets"; - private static final Map<String, String> DEFAULT_PROPS = new HashMap<>(); - static { - DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC); - DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); - } - private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>(); - static { - FIRST_SET.put(buffer("key"), buffer("value")); - FIRST_SET.put(null, null); - } - - private static final ByteBuffer TP0_KEY = buffer("TP0KEY"); - private static final ByteBuffer TP1_KEY = buffer("TP1KEY"); - private static final ByteBuffer TP2_KEY = buffer("TP2KEY"); - private static final ByteBuffer TP0_VALUE = buffer("VAL0"); - private static final ByteBuffer TP1_VALUE = buffer("VAL1"); - private static final ByteBuffer TP2_VALUE = buffer("VAL2"); - private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW"); - private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW"); - - @Mock - KafkaBasedLog<byte[], byte[]> storeLog; - private KafkaOffsetBackingStore store; - - private Capture<String> capturedTopic = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); - private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture(); - - @Before - public void setUp() throws Exception { - store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"}); - } - - @Test(expected = CopycatException.class) - public void testMissingTopic() { - store = new KafkaOffsetBackingStore(); - store.configure(Collections.<String, Object>emptyMap()); - } - - @Test - public void testStartStop() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST); - expectStop(); - - PowerMock.replayAll(); - - store.configure(DEFAULT_PROPS); - assertEquals(TOPIC, capturedTopic.getValue()); - assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); - - store.start(); - store.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testReloadOnStart() throws Exception { - expectConfigure(); - expectStart(Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()), - new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()), - new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()), - new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()) - )); - expectStop(); - - PowerMock.replayAll(); - - store.configure(DEFAULT_PROPS); - store.start(); - HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data"); - assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY)); - assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY)); - - store.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testGetSet() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST); - expectStop(); - - // First get() against an empty store - final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - firstGetReadToEndCallback.getValue().onCompletion(null, null); - return null; - } - }); - - // Set offsets - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); - PowerMock.expectLastCall(); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); - PowerMock.expectLastCall(); - - // Second get() should get the produced data and return the new values - final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array())); - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array())); - secondGetReadToEndCallback.getValue().onCompletion(null, null); - return null; - } - }); - - // Third get() should pick up data produced by someone else and return those values - final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array())); - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())); - thirdGetReadToEndCallback.getValue().onCompletion(null, null); - return null; - } - }); - - PowerMock.replayAll(); - - - - store.configure(DEFAULT_PROPS); - store.start(); - - // Getting from empty store should return nulls - final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false); - store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { - @Override - public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { - // Since we didn't read them yet, these will be null - assertEquals(null, result.get(TP0_KEY)); - assertEquals(null, result.get(TP1_KEY)); - getInvokedAndPassed.set(true); - } - }).get(10000, TimeUnit.MILLISECONDS); - assertTrue(getInvokedAndPassed.get()); - - // Set some offsets - Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>(); - toSet.put(TP0_KEY, TP0_VALUE); - toSet.put(TP1_KEY, TP1_VALUE); - final AtomicBoolean invoked = new AtomicBoolean(false); - Future<Void> setFuture = store.set(toSet, new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - invoked.set(true); - } - }); - assertFalse(setFuture.isDone()); - // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback - // for the store's set callback - callback1.getValue().onCompletion(null, null); - assertFalse(invoked.get()); - callback0.getValue().onCompletion(null, null); - setFuture.get(10000, TimeUnit.MILLISECONDS); - assertTrue(invoked.get()); - - // Getting data should read to end of our published data and return it - final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false); - store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { - @Override - public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { - assertEquals(TP0_VALUE, result.get(TP0_KEY)); - assertEquals(TP1_VALUE, result.get(TP1_KEY)); - secondGetInvokedAndPassed.set(true); - } - }).get(10000, TimeUnit.MILLISECONDS); - assertTrue(secondGetInvokedAndPassed.get()); - - // Getting data should read to end of our published data and return it - final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false); - store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() { - @Override - public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) { - assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY)); - assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY)); - thirdGetInvokedAndPassed.set(true); - } - }).get(10000, TimeUnit.MILLISECONDS); - assertTrue(thirdGetInvokedAndPassed.get()); - - store.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testSetFailure() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST); - expectStop(); - - // Set offsets - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); - PowerMock.expectLastCall(); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); - PowerMock.expectLastCall(); - Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2)); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - - - store.configure(DEFAULT_PROPS); - store.start(); - - // Set some offsets - Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>(); - toSet.put(TP0_KEY, TP0_VALUE); - toSet.put(TP1_KEY, TP1_VALUE); - toSet.put(TP2_KEY, TP2_VALUE); - final AtomicBoolean invoked = new AtomicBoolean(false); - final AtomicBoolean invokedFailure = new AtomicBoolean(false); - Future<Void> setFuture = store.set(toSet, new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - invoked.set(true); - if (error != null) - invokedFailure.set(true); - } - }); - assertFalse(setFuture.isDone()); - // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback - // for the store's set callback - callback1.getValue().onCompletion(null, null); - assertFalse(invoked.get()); - callback2.getValue().onCompletion(null, new KafkaException("bogus error")); - assertTrue(invoked.get()); - assertTrue(invokedFailure.get()); - callback0.getValue().onCompletion(null, null); - try { - setFuture.get(10000, TimeUnit.MILLISECONDS); - fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future"); - } catch (ExecutionException e) { - // expected - assertNotNull(e.getCause()); - assertTrue(e.getCause() instanceof KafkaException); - } - - store.stop(); - - PowerMock.verifyAll(); - } - - private void expectConfigure() throws Exception { - PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), - EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback)) - .andReturn(storeLog); - } - - private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) throws Exception { - storeLog.start(); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords) - capturedConsumedCallback.getValue().onCompletion(null, rec); - return null; - } - }); - } - - private void expectStop() { - storeLog.stop(); - PowerMock.expectLastCall(); - } - - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); - } - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java deleted file mode 100644 index 3dd0b52..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.util.Callback; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -public class OffsetStorageWriterTest { - private static final String NAMESPACE = "namespace"; - // Copycat format - any types should be accepted here - private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key"); - private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12); - - // Serialized - private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); - private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); - - @Mock private OffsetBackingStore store; - @Mock private Converter keyConverter; - @Mock private Converter valueConverter; - private OffsetStorageWriter writer; - - private static Exception exception = new RuntimeException("error"); - - private ExecutorService service; - - @Before - public void setup() { - writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter); - service = Executors.newFixedThreadPool(1); - } - - @After - public void teardown() { - service.shutdownNow(); - } - - @Test - public void testWriteFlush() throws Exception { - @SuppressWarnings("unchecked") - Callback<Void> callback = PowerMock.createMock(Callback.class); - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); - - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, OFFSET_VALUE); - - assertTrue(writer.beginFlush()); - writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); - } - - // It should be possible to set offset values to null - @Test - public void testWriteNullValueFlush() throws Exception { - @SuppressWarnings("unchecked") - Callback<Void> callback = PowerMock.createMock(Callback.class); - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, false, null); - - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, null); - - assertTrue(writer.beginFlush()); - writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); - } - - // It should be possible to use null keys. These aren't actually stored as null since the key is wrapped to include - // info about the namespace (connector) - @Test - public void testWriteNullKeyFlush() throws Exception { - @SuppressWarnings("unchecked") - Callback<Void> callback = PowerMock.createMock(Callback.class); - expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); - - PowerMock.replayAll(); - - writer.offset(null, OFFSET_VALUE); - - assertTrue(writer.beginFlush()); - writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); - } - - @Test - public void testNoOffsetsToFlush() { - // If no offsets are flushed, we should finish immediately and not have made any calls to the - // underlying storage layer - - PowerMock.replayAll(); - - // Should not return a future - assertFalse(writer.beginFlush()); - - PowerMock.verifyAll(); - } - - @Test - public void testFlushFailureReplacesOffsets() throws Exception { - // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored - // such that a subsequent flush will write them. - - @SuppressWarnings("unchecked") - final Callback<Void> callback = PowerMock.createMock(Callback.class); - // First time the write fails - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null); - // Second time it succeeds - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); - // Third time it has no data to flush so we won't get past beginFlush() - - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); - writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); - assertTrue(writer.beginFlush()); - writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); - assertFalse(writer.beginFlush()); - - PowerMock.verifyAll(); - } - - @Test(expected = CopycatException.class) - public void testAlreadyFlushing() throws Exception { - @SuppressWarnings("unchecked") - final Callback<Void> callback = PowerMock.createMock(Callback.class); - // Trigger the send, but don't invoke the callback so we'll still be mid-flush - CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); - - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); - writer.doFlush(callback); - assertTrue(writer.beginFlush()); // should throw - - PowerMock.verifyAll(); - } - - @Test - public void testCancelBeforeAwaitFlush() { - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); - writer.cancelFlush(); - - PowerMock.verifyAll(); - } - - @Test - public void testCancelAfterAwaitFlush() throws Exception { - @SuppressWarnings("unchecked") - Callback<Void> callback = PowerMock.createMock(Callback.class); - CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); - // In this test, the write should be cancelled so the callback will not be invoked and is not - // passed to the expectStore call - expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); - - PowerMock.replayAll(); - - writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); - // Start the flush, then immediately cancel before allowing the mocked store request to finish - Future<Void> flushFuture = writer.doFlush(callback); - writer.cancelFlush(); - allowStoreCompleteCountdown.countDown(); - flushFuture.get(1000, TimeUnit.MILLISECONDS); - - PowerMock.verifyAll(); - } - - /** - * Expect a request to store data to the underlying OffsetBackingStore. - * - * @param key the key for the offset - * @param keySerialized serialized version of the key - * @param value the value for the offset - * @param valueSerialized serialized version of the value - * @param callback the callback to invoke when completed, or null if the callback isn't - * expected to be invoked - * @param fail if true, treat - * @param waitForCompletion if non-null, a CountDownLatch that should be awaited on before - * invoking the callback. A (generous) timeout is still imposed to - * ensure tests complete. - * @return the captured set of ByteBuffer key-value pairs passed to the storage layer - */ - private void expectStore(Map<String, String> key, byte[] keySerialized, - Map<String, Integer> value, byte[] valueSerialized, - final Callback<Void> callback, - final boolean fail, - final CountDownLatch waitForCompletion) { - List<Object> keyWrapped = Arrays.asList(NAMESPACE, key); - EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, keyWrapped)).andReturn(keySerialized); - EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, value)).andReturn(valueSerialized); - - final Capture<Callback<Void>> storeCallback = Capture.newInstance(); - final Map<ByteBuffer, ByteBuffer> offsetsSerialized = Collections.singletonMap( - keySerialized == null ? null : ByteBuffer.wrap(keySerialized), - valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized)); - EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), EasyMock.capture(storeCallback))) - .andAnswer(new IAnswer<Future<Void>>() { - @Override - public Future<Void> answer() throws Throwable { - return service.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - if (waitForCompletion != null) - assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS)); - - if (fail) { - storeCallback.getValue().onCompletion(exception, null); - } else { - storeCallback.getValue().onCompletion(null, null); - } - return null; - } - }); - } - }); - if (callback != null) { - if (fail) { - callback.onCompletion(EasyMock.eq(exception), EasyMock.eq((Void) null)); - } else { - callback.onCompletion(null, null); - } - } - PowerMock.expectLastCall(); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java deleted file mode 100644 index 929ea85..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.easymock.EasyMock; -import org.easymock.IArgumentMatcher; - -import java.util.Arrays; - -public class ByteArrayProducerRecordEquals implements IArgumentMatcher { - private ProducerRecord<byte[], byte[]> record; - - public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[], byte[]> in) { - EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in)); - return null; - } - - public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) { - this.record = record; - } - - @Override - public boolean matches(Object argument) { - if (!(argument instanceof ProducerRecord)) - return false; - ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>) argument; - return record.topic().equals(other.topic()) && - record.partition() != null ? record.partition().equals(other.partition()) : other.partition() == null && - record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key() == null && - record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value() == null; - } - - @Override - public void appendTo(StringBuffer buffer) { - buffer.append(record.toString()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java deleted file mode 100644 index eb62c9e..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java +++ /dev/null @@ -1,437 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.LeaderNotAvailableException; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.Time; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaBasedLog.class) -@PowerMockIgnore("javax.management.*") -public class KafkaBasedLogTest { - - private static final String TOPIC = "copycat-log"; - private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0); - private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1); - private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>(); - static { - PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); - PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - } - private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>(); - static { - CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); - CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - } - - private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1)); - private static final Map<String, String> FIRST_SET = new HashMap<>(); - static { - FIRST_SET.put("key", "value"); - FIRST_SET.put(null, null); - } - - private static final Node LEADER = new Node(1, "broker1", 9092); - private static final Node REPLICA = new Node(1, "broker2", 9093); - - private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA}); - private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA}); - - private static final String TP0_KEY = "TP0KEY"; - private static final String TP1_KEY = "TP1KEY"; - private static final String TP0_VALUE = "VAL0"; - private static final String TP1_VALUE = "VAL1"; - private static final String TP0_VALUE_NEW = "VAL0_NEW"; - private static final String TP1_VALUE_NEW = "VAL1_NEW"; - - private Time time = new MockTime(); - private KafkaBasedLog<String, String> store; - - @Mock - private KafkaProducer<String, String> producer; - private MockConsumer<String, String> consumer; - - private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>(); - private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() { - @Override - public void onCompletion(Throwable error, ConsumerRecord<String, String> record) { - consumedRecords.add(record); - } - }; - - @Before - public void setUp() throws Exception { - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time); - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); - Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); - beginningOffsets.put(TP0, 0L); - beginningOffsets.put(TP1, 0L); - consumer.updateBeginningOffsets(beginningOffsets); - } - - @Test - public void testStartStop() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(TP0, 0L); - endOffsets.put(TP1, 0L); - consumer.updateEndOffsets(endOffsets); - store.start(); - assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - - store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); - } - - @Test - public void testReloadOnStart() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(TP0, 1L); - endOffsets.put(TP1, 1L); - consumer.updateEndOffsets(endOffsets); - final CountDownLatch finishedLatch = new CountDownLatch(1); - consumer.schedulePollTask(new Runnable() { // Use first poll task to setup sequence of remaining responses to polls - @Override - public void run() { - // Should keep polling until it reaches current log end offset for all partitions. Should handle - // as many empty polls as needed - consumer.scheduleNopPollTask(); - consumer.scheduleNopPollTask(); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE)); - } - }); - consumer.scheduleNopPollTask(); - consumer.scheduleNopPollTask(); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE)); - } - }); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - finishedLatch.countDown(); - } - }); - } - }); - store.start(); - assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); - - assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(2, consumedRecords.size()); - assertEquals(TP0_VALUE, consumedRecords.get(0).value()); - assertEquals(TP1_VALUE, consumedRecords.get(1).value()); - - store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); - } - - @Test - public void testSendAndReadToEnd() throws Exception { - expectStart(); - TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); - ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); - TestFuture<RecordMetadata> tp1Future = new TestFuture<>(); - ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); - - // Producer flushes when read to log end is called - producer.flush(); - PowerMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(TP0, 0L); - endOffsets.put(TP1, 0L); - consumer.updateEndOffsets(endOffsets); - store.start(); - assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(0L, consumer.position(TP0)); - assertEquals(0L, consumer.position(TP1)); - - // Set some keys - final AtomicInteger invoked = new AtomicInteger(0); - org.apache.kafka.clients.producer.Callback producerCallback = new org.apache.kafka.clients.producer.Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - invoked.incrementAndGet(); - } - }; - store.send(TP0_KEY, TP0_VALUE, producerCallback); - store.send(TP1_KEY, TP1_VALUE, producerCallback); - assertEquals(0, invoked.get()); - tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing - callback1.getValue().onCompletion(null, null); - assertEquals(1, invoked.get()); - tp0Future.resolve((RecordMetadata) null); - callback0.getValue().onCompletion(null, null); - assertEquals(2, invoked.get()); - - // Now we should have to wait for the records to be read back when we call readToEnd() - final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false); - final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - assertEquals(4, consumedRecords.size()); - assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value()); - assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value()); - getInvokedAndPassed.set(true); - } - }); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events - // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without - // returning any data. - store.readToEnd(readEndFutureCallback); - - // Needs to seek to end to find end offsets - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - Map<TopicPartition, Long> newEndOffsets = new HashMap<>(); - newEndOffsets.put(TP0, 2L); - newEndOffsets.put(TP1, 2L); - consumer.updateEndOffsets(newEndOffsets); - } - }); - - // Should keep polling until it reaches current log end offset for all partitions - consumer.scheduleNopPollTask(); - consumer.scheduleNopPollTask(); - consumer.scheduleNopPollTask(); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE)); - } - }); - - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW)); - } - }); - - // Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch - } - }); - readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS); - assertTrue(getInvokedAndPassed.get()); - - // Cleanup - store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); - } - - @Test - public void testConsumerError() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - - final CountDownLatch finishedLatch = new CountDownLatch(1); - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(TP0, 1L); - endOffsets.put(TP1, 1L); - consumer.updateEndOffsets(endOffsets); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - // Trigger exception - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception()); - } - }); - - // Should keep polling until it reaches current log end offset for all partitions - consumer.scheduleNopPollTask(); - consumer.scheduleNopPollTask(); - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW)); - } - }); - - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - finishedLatch.countDown(); - } - }); - } - }); - store.start(); - assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); - assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(1L, consumer.position(TP0)); - - store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); - } - - @Test - public void testProducerError() throws Exception { - expectStart(); - TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); - ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); - - expectStop(); - - PowerMock.replayAll(); - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - endOffsets.put(TP0, 0L); - endOffsets.put(TP1, 0L); - consumer.updateEndOffsets(endOffsets); - store.start(); - assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); - assertEquals(0L, consumer.position(TP0)); - assertEquals(0L, consumer.position(TP1)); - - final AtomicReference<Throwable> setException = new AtomicReference<>(); - store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - assertNull(setException.get()); // Should only be invoked once - setException.set(exception); - } - }); - KafkaException exc = new LeaderNotAvailableException("Error"); - tp0Future.resolve(exc); - callback0.getValue().onCompletion(null, exc); - assertNotNull(setException.get()); - - store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); - } - - - private void expectStart() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); - } - - private void expectStop() { - producer.close(); - PowerMock.expectLastCall(); - // MockConsumer close is checked after test. - } - - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java deleted file mode 100644 index 53149db..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.util; - -import org.apache.kafka.common.utils.Time; - -import java.util.concurrent.TimeUnit; - -/** - * A clock that you can manually advance by calling sleep - */ -public class MockTime implements Time { - - private long nanos = 0; - - public MockTime() { - this.nanos = System.nanoTime(); - } - - @Override - public long milliseconds() { - return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); - } - - @Override - public long nanoseconds() { - return nanos; - } - - @Override - public void sleep(long ms) { - this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java deleted file mode 100644 index 4880ca1..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class ShutdownableThreadTest { - - @Test - public void testGracefulShutdown() throws InterruptedException { - ShutdownableThread thread = new ShutdownableThread("graceful") { - @Override - public void execute() { - while (getRunning()) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Ignore - } - } - } - }; - thread.start(); - Thread.sleep(10); - assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS)); - } - - @Test - public void testForcibleShutdown() throws InterruptedException { - final CountDownLatch startedLatch = new CountDownLatch(1); - ShutdownableThread thread = new ShutdownableThread("forcible") { - @Override - public void execute() { - try { - startedLatch.countDown(); - Thread.sleep(100000); - } catch (InterruptedException e) { - // Ignore - } - } - }; - thread.start(); - startedLatch.await(); - thread.forceShutdown(); - // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in - // certain conditions, but in this case we know the thread is interruptible so we should be - // able join() it - thread.join(1000); - assertFalse(thread.isAlive()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java deleted file mode 100644 index 5dc6d33..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -/** - * An UncaughtExceptionHandler that can be registered with one or more threads which tracks the - * first exception so the main thread can check for uncaught exceptions. - */ -public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExceptionHandler { - private Throwable firstException = null; - - @Override - public void uncaughtException(Thread t, Throwable e) { - if (this.firstException == null) - this.firstException = e; - } - - public void verifyNoExceptions() { - if (this.firstException != null) - throw new AssertionError(this.firstException); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java deleted file mode 100644 index 55e24c8..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class TestFuture<T> implements Future<T> { - private volatile boolean resolved; - private T result; - private Throwable exception; - private CountDownLatch getCalledLatch; - - private volatile boolean resolveOnGet; - private T resolveOnGetResult; - private Throwable resolveOnGetException; - - public TestFuture() { - resolved = false; - getCalledLatch = new CountDownLatch(1); - - resolveOnGet = false; - resolveOnGetResult = null; - resolveOnGetException = null; - } - - public void resolve(T val) { - this.result = val; - resolved = true; - synchronized (this) { - this.notifyAll(); - } - } - - public void resolve(Throwable t) { - exception = t; - resolved = true; - synchronized (this) { - this.notifyAll(); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return resolved; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - getCalledLatch.countDown(); - while (true) { - try { - return get(Integer.MAX_VALUE, TimeUnit.DAYS); - } catch (TimeoutException e) { - // ignore - } - } - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - getCalledLatch.countDown(); - - if (resolveOnGet) { - if (resolveOnGetException != null) - resolve(resolveOnGetException); - else - resolve(resolveOnGetResult); - } - - synchronized (this) { - while (!resolved) { - this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); - } - } - - if (exception != null) { - if (exception instanceof TimeoutException) - throw (TimeoutException) exception; - else if (exception instanceof InterruptedException) - throw (InterruptedException) exception; - else - throw new ExecutionException(exception); - } - return result; - } - - /** - * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately. - * @param val the value to return from the future - */ - public void resolveOnGet(T val) { - resolveOnGet = true; - resolveOnGetResult = val; - } - - /** - * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately. - * @param t the exception to return from the future - */ - public void resolveOnGet(Throwable t) { - resolveOnGet = true; - resolveOnGetException = t; - } - - /** - * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with - * the specified value. - * @param val the value to return from the future - */ - public void waitForGetAndResolve(T val) { - waitForGet(); - resolve(val); - } - - /** - * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with - * the specified value. - * @param t the exception to use to resolve the future - */ - public void waitForGetAndResolve(Throwable t) { - waitForGet(); - resolve(t); - } - - private void waitForGet() { - try { - getCalledLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException("Unexpected interruption: ", e); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java deleted file mode 100644 index ed99247..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.junit.After; -import org.junit.Before; - -/** - * Base class for tests that use threads. It sets up uncaught exception handlers for all known - * thread classes and checks for errors at the end of the test so that failures in background - * threads will cause the test to fail. - */ -public class ThreadedTest { - - protected TestBackgroundThreadExceptionHandler backgroundThreadExceptionHandler; - - @Before - public void setup() { - backgroundThreadExceptionHandler = new TestBackgroundThreadExceptionHandler(); - ShutdownableThread.funcaughtExceptionHandler = backgroundThreadExceptionHandler; - } - - @After - public void teardown() { - backgroundThreadExceptionHandler.verifyNoExceptions(); - ShutdownableThread.funcaughtExceptionHandler = null; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/resources/log4j.properties b/copycat/runtime/src/test/resources/log4j.properties deleted file mode 100644 index d5e90fe..0000000 --- a/copycat/runtime/src/test/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -log4j.rootLogger=OFF, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n - -log4j.logger.org.apache.kafka=ERROR http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index c1278e4..5be410c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -164,7 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE.code, joinGroupResult.errorCode) EasyMock.reset(replicaManager) - val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "connect", protocols) assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode) } http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/docs/security.html ---------------------------------------------------------------------- diff --git a/docs/security.html b/docs/security.html index d250553..80e30bc 100644 --- a/docs/security.html +++ b/docs/security.html @@ -224,7 +224,7 @@ Apache Kafka allows clients to connect over SSL. By default SSL is disabled but </ol> </li> <li><b><a name="jaas_client">Creating Client Side JAAS Config</a></b><br> - Clients (producers, consumers, copycat workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user used for running the client), so obtain or create these principals as needed. Then create a JAAS file as follows: + Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user used for running the client), so obtain or create these principals as needed. Then create a JAAS file as follows: <pre> KafkaClient { com.sun.security.auth.module.Krb5LoginModule required http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 357305b..0875611 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,4 @@ apply from: file('scala.gradle') include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', - 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file' \ No newline at end of file + 'connect:api', 'connect:runtime', 'connect:json', 'connect:file' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/services/connect.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py new file mode 100644 index 0000000..fbac565 --- /dev/null +++ b/tests/kafkatest/services/connect.py @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until +from ducktape.errors import DucktapeError + +from kafkatest.services.kafka.directory import kafka_dir +import signal, random, requests + +class ConnectServiceBase(Service): + """Base class for Kafka Connect services providing some common settings and functionality""" + + logs = { + "kafka_log": { + "path": "/mnt/connect.log", + "collect_default": True}, + } + + def __init__(self, context, num_nodes, kafka, files): + super(ConnectServiceBase, self).__init__(context, num_nodes) + self.kafka = kafka + self.files = files + + def pids(self, node): + """Return process ids for Kafka Connect processes.""" + try: + return [pid for pid in node.account.ssh_capture("cat /mnt/connect.pid", callback=int)] + except: + return [] + + def set_configs(self, config_template_func, connector_config_templates=None): + """ + Set configurations for the worker and the connector to run on + it. These are not provided in the constructor because the worker + config generally needs access to ZK/Kafka services to + create the configuration. + """ + self.config_template_func = config_template_func + self.connector_config_templates = connector_config_templates + + def stop_node(self, node, clean_shutdown=True): + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + + for pid in pids: + node.account.signal(pid, sig, allow_fail=False) + for pid in pids: + wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Kafka Connect standalone process took too long to exit") + + node.account.ssh("rm -f /mnt/connect.pid", allow_fail=False) + + def restart(self): + # We don't want to do any clean up here, just restart the process. + for node in self.nodes: + self.stop_node(node) + self.start_node(node) + + def clean_node(self, node): + if len(self.pids(node)) > 0: + self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % + (self.__class__.__name__, node.account)) + for pid in self.pids(node): + node.account.signal(pid, signal.SIGKILL, allow_fail=False) + + node.account.ssh("rm -rf /mnt/connect.pid /mnt/connect.log /mnt/connect.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False) + + def config_filenames(self): + return ["/mnt/connect-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])] + + + def list_connectors(self, node=None): + return self._rest('/connectors', node=node) + + def create_connector(self, config, node=None): + create_request = { + 'name': config['name'], + 'config': config + } + return self._rest('/connectors', create_request, node=node, method="POST") + + def get_connector(self, name, node=None): + return self._rest('/connectors/' + name, node=node) + + def get_connector_config(self, name, node=None): + return self._rest('/connectors/' + name + '/config', node=node) + + def set_connector_config(self, name, config, node=None): + return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT") + + def get_connector_tasks(self, name, node=None): + return self._rest('/connectors/' + name + '/tasks', node=node) + + def delete_connector(self, name, node=None): + return self._rest('/connectors/' + name, node=node, method="DELETE") + + def _rest(self, path, body=None, node=None, method="GET"): + if node is None: + node = random.choice(self.nodes) + + meth = getattr(requests, method.lower()) + url = self._base_url(node) + path + resp = meth(url, json=body) + self.logger.debug("%s %s response: %d", url, method, resp.status_code) + if resp.status_code > 400: + raise ConnectRestError(resp.status_code, resp.text, resp.url) + if resp.status_code == 204: + return None + else: + return resp.json() + + + def _base_url(self, node): + return 'http://' + node.account.hostname + ':' + '8083' + +class ConnectStandaloneService(ConnectServiceBase): + """Runs Kafka Connect in standalone mode.""" + + def __init__(self, context, kafka, files): + super(ConnectStandaloneService, self).__init__(context, 1, kafka, files) + + # For convenience since this service only makes sense with a single node + @property + def node(self): + return self.nodes[0] + + def start_node(self, node): + node.account.create_file("/mnt/connect.properties", self.config_template_func(node)) + remote_connector_configs = [] + for idx, template in enumerate(self.connector_config_templates): + target_file = "/mnt/connect-connector-" + str(idx) + ".properties" + node.account.create_file(target_file, template) + remote_connector_configs.append(target_file) + + self.logger.info("Starting Kafka Connect standalone process") + with node.account.monitor_log("/mnt/connect.log") as monitor: + node.account.ssh("/opt/%s/bin/connect-standalone.sh /mnt/connect.properties " % kafka_dir(node) + + " ".join(remote_connector_configs) + + " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid") + monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup") + + if len(self.pids(node)) == 0: + raise RuntimeError("No process ids recorded") + + +class ConnectDistributedService(ConnectServiceBase): + """Runs Kafka Connect in distributed mode.""" + + def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets", configs_topic="connect-configs"): + super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files) + self.offsets_topic = offsets_topic + self.configs_topic = configs_topic + + def start_node(self, node): + node.account.create_file("/mnt/connect.properties", self.config_template_func(node)) + if self.connector_config_templates: + raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API") + + self.logger.info("Starting Kafka Connect distributed process") + with node.account.monitor_log("/mnt/connect.log") as monitor: + cmd = "/opt/%s/bin/connect-distributed.sh /mnt/connect.properties " % kafka_dir(node) + cmd += " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid" + node.account.ssh(cmd) + monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup") + + if len(self.pids(node)) == 0: + raise RuntimeError("No process ids recorded") + + + + +class ConnectRestError(RuntimeError): + def __init__(self, status, msg, url): + self.status = status + self.message = msg + self.url = url + + def __unicode__(self): + return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/services/copycat.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py deleted file mode 100644 index 5ad9dd5..0000000 --- a/tests/kafkatest/services/copycat.py +++ /dev/null @@ -1,191 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.services.service import Service -from ducktape.utils.util import wait_until -from ducktape.errors import DucktapeError - -from kafkatest.services.kafka.directory import kafka_dir -import signal, random, requests - -class CopycatServiceBase(Service): - """Base class for Copycat services providing some common settings and functionality""" - - logs = { - "kafka_log": { - "path": "/mnt/copycat.log", - "collect_default": True}, - } - - def __init__(self, context, num_nodes, kafka, files): - super(CopycatServiceBase, self).__init__(context, num_nodes) - self.kafka = kafka - self.files = files - - def pids(self, node): - """Return process ids for Copycat processes.""" - try: - return [pid for pid in node.account.ssh_capture("cat /mnt/copycat.pid", callback=int)] - except: - return [] - - def set_configs(self, config_template_func, connector_config_templates=None): - """ - Set configurations for the worker and the connector to run on - it. These are not provided in the constructor because the worker - config generally needs access to ZK/Kafka services to - create the configuration. - """ - self.config_template_func = config_template_func - self.connector_config_templates = connector_config_templates - - def stop_node(self, node, clean_shutdown=True): - pids = self.pids(node) - sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL - - for pid in pids: - node.account.signal(pid, sig, allow_fail=False) - for pid in pids: - wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Copycat standalone process took too long to exit") - - node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False) - - def restart(self): - # We don't want to do any clean up here, just restart the process. - for node in self.nodes: - self.stop_node(node) - self.start_node(node) - - def clean_node(self, node): - if len(self.pids(node)) > 0: - self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % - (self.__class__.__name__, node.account)) - for pid in self.pids(node): - node.account.signal(pid, signal.SIGKILL, allow_fail=False) - - node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False) - - def config_filenames(self): - return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])] - - - def list_connectors(self, node=None): - return self._rest('/connectors', node=node) - - def create_connector(self, config, node=None): - create_request = { - 'name': config['name'], - 'config': config - } - return self._rest('/connectors', create_request, node=node, method="POST") - - def get_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node) - - def get_connector_config(self, name, node=None): - return self._rest('/connectors/' + name + '/config', node=node) - - def set_connector_config(self, name, config, node=None): - return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT") - - def get_connector_tasks(self, name, node=None): - return self._rest('/connectors/' + name + '/tasks', node=node) - - def delete_connector(self, name, node=None): - return self._rest('/connectors/' + name, node=node, method="DELETE") - - def _rest(self, path, body=None, node=None, method="GET"): - if node is None: - node = random.choice(self.nodes) - - meth = getattr(requests, method.lower()) - url = self._base_url(node) + path - resp = meth(url, json=body) - self.logger.debug("%s %s response: %d", url, method, resp.status_code) - if resp.status_code > 400: - raise CopycatRestError(resp.status_code, resp.text, resp.url) - if resp.status_code == 204: - return None - else: - return resp.json() - - - def _base_url(self, node): - return 'http://' + node.account.hostname + ':' + '8083' - -class CopycatStandaloneService(CopycatServiceBase): - """Runs Copycat in standalone mode.""" - - def __init__(self, context, kafka, files): - super(CopycatStandaloneService, self).__init__(context, 1, kafka, files) - - # For convenience since this service only makes sense with a single node - @property - def node(self): - return self.nodes[0] - - def start_node(self, node): - node.account.create_file("/mnt/copycat.properties", self.config_template_func(node)) - remote_connector_configs = [] - for idx, template in enumerate(self.connector_config_templates): - target_file = "/mnt/copycat-connector-" + str(idx) + ".properties" - node.account.create_file(target_file, template) - remote_connector_configs.append(target_file) - - self.logger.info("Starting Copycat standalone process") - with node.account.monitor_log("/mnt/copycat.log") as monitor: - node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) + - " ".join(remote_connector_configs) + - " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") - monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") - - if len(self.pids(node)) == 0: - raise RuntimeError("No process ids recorded") - - -class CopycatDistributedService(CopycatServiceBase): - """Runs Copycat in distributed mode.""" - - def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets", configs_topic="copycat-configs"): - super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files) - self.offsets_topic = offsets_topic - self.configs_topic = configs_topic - - def start_node(self, node): - node.account.create_file("/mnt/copycat.properties", self.config_template_func(node)) - if self.connector_config_templates: - raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API") - - self.logger.info("Starting Copycat distributed process") - with node.account.monitor_log("/mnt/copycat.log") as monitor: - cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node) - cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid" - node.account.ssh(cmd) - monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") - - if len(self.pids(node)) == 0: - raise RuntimeError("No process ids recorded") - - - - -class CopycatRestError(RuntimeError): - def __init__(self, status, msg, url): - self.status = status - self.message = msg - self.url = url - - def __unicode__(self): - return "Copycat REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message \ No newline at end of file