http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java new file mode 100644 index 0000000..6915631 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.MockTime; +import org.apache.kafka.connect.util.ThreadedTest; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(WorkerSinkTask.class) +@PowerMockIgnore("javax.management.*") +public class WorkerSinkTaskThreadedTest extends ThreadedTest { + + // These are fixed to keep this code simpler. In this example we assume byte[] raw values + // with mix of integer/string in Connect + private static final String TOPIC = "test"; + private static final int PARTITION = 12; + private static final int PARTITION2 = 13; + private static final int PARTITION3 = 14; + private static final long FIRST_OFFSET = 45; + private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; + private static final int KEY = 12; + private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; + private static final String VALUE = "VALUE"; + private static final byte[] RAW_KEY = "key".getBytes(); + private static final byte[] RAW_VALUE = "value".getBytes(); + + private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); + private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); + private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); + private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200); + + private static final Map<String, String> TASK_PROPS = new HashMap<>(); + static { + TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); + } + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private Time time; + @Mock private SinkTask sinkTask; + private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); + private WorkerConfig workerConfig; + @Mock private Converter keyConverter; + @Mock + private Converter valueConverter; + private WorkerSinkTask workerTask; + @Mock private KafkaConsumer<byte[], byte[]> consumer; + private WorkerSinkTaskThread workerThread; + private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); + + private long recordsReturned; + + @SuppressWarnings("unchecked") + @Override + public void setup() { + super.setup(); + time = new MockTime(); + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerConfig = new StandaloneConfig(workerProps); + workerTask = PowerMock.createPartialMock( + WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); + + recordsReturned = 0; + } + + @Test + public void testPollsInBackground() throws Exception { + expectInitializeTask(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L); + expectStopTask(10L); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + for (int i = 0; i < 10; i++) { + workerThread.iteration(); + } + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + // Verify contents match expected values, i.e. that they were translated properly. With max + // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches + assertEquals(10, capturedRecords.getValues().size()); + int offset = 0; + for (Collection<SinkRecord> recs : capturedRecords.getValues()) { + assertEquals(1, recs.size()); + for (SinkRecord rec : recs) { + SinkRecord referenceSinkRecord + = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset); + assertEquals(referenceSinkRecord, rec); + offset++; + } + } + + PowerMock.verifyAll(); + } + + @Test + public void testCommit() throws Exception { + expectInitializeTask(); + // Make each poll() take the offset commit interval + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, null, 0, true); + expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + // First iteration gets one record + workerThread.iteration(); + // Second triggers commit, gets a second offset + workerThread.iteration(); + // Commit finishes synchronously for testing so we can check this immediately + assertEquals(0, workerThread.commitFailures()); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + assertEquals(2, capturedRecords.getValues().size()); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitTaskFlushFailure() throws Exception { + expectInitializeTask(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, new RuntimeException(), null, 0, true); + // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization + // for all topic partitions + consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); + PowerMock.expectLastCall(); + expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + // Second iteration triggers commit + workerThread.iteration(); + workerThread.iteration(); + assertEquals(1, workerThread.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitTaskSuccessAndFlushFailure() throws Exception { + // Validate that we rewind to the correct offsets if a task's flush method throws an exception + + expectInitializeTask(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, null, 0, true); + expectOffsetFlush(2L, new RuntimeException(), null, 0, true); + // Should rewind to last known committed positions + consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); + PowerMock.expectLastCall(); + expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + // Second iteration triggers first commit, third iteration triggers second (failing) commit + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + assertEquals(1, workerThread.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitConsumerFailure() throws Exception { + expectInitializeTask(); + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, new Exception(), 0, true); + expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + // Second iteration triggers commit + workerThread.iteration(); + workerThread.iteration(); + // TODO Response to consistent failures? + assertEquals(1, workerThread.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitTimeout() throws Exception { + expectInitializeTask(); + // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit + Capture<Collection<SinkRecord>> capturedRecords + = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); + expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); + expectStopTask(4); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't + // trigger another commit + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + // TODO Response to consistent failures? + assertEquals(1, workerThread.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testAssignmentPauseResume() throws Exception { + // Just validate that the calls are passed through to the consumer, and that where appropriate errors are + // converted + expectInitializeTask(); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)), + sinkTaskContext.getValue().assignment()); + return null; + } + }); + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3))); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + try { + sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION); + fail("Trying to pause unassigned partition should have thrown an Connect exception"); + } catch (ConnectException e) { + // expected + } + sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); + return null; + } + }); + consumer.pause(UNASSIGNED_TOPIC_PARTITION); + PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); + consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + try { + sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); + fail("Trying to resume unassigned partition should have thrown an Connect exception"); + } catch (ConnectException e) { + // expected + } + + sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); + return null; + } + }); + consumer.resume(UNASSIGNED_TOPIC_PARTITION); + PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); + consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + expectStopTask(0); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + @Test + public void testRewind() throws Exception { + expectInitializeTask(); + final long startOffset = 40L; + final Map<TopicPartition, Long> offsets = new HashMap<>(); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + offsets.put(TOPIC_PARTITION, startOffset); + sinkTaskContext.getValue().offset(offsets); + return null; + } + }); + + consumer.seek(TOPIC_PARTITION, startOffset); + EasyMock.expectLastCall(); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets(); + assertEquals(0, offsets.size()); + return null; + } + }); + + expectStopTask(3); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + workerThread.iteration(); + workerThread.iteration(); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } + + private void expectInitializeTask() throws Exception { + PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); + + workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, + workerTask, "mock-worker-thread", time, + workerConfig); + PowerMock.expectPrivate(workerTask, "createWorkerThread") + .andReturn(workerThread); + workerThread.start(); + PowerMock.expectLastCall(); + + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + PowerMock.expectLastCall(); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + return ConsumerRecords.empty(); + } + }); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); + + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); + PowerMock.expectLastCall(); + sinkTask.start(TASK_PROPS); + PowerMock.expectLastCall(); + } + + private void expectStopTask(final long expectedMessages) throws Exception { + final long finalOffset = FIRST_OFFSET + expectedMessages - 1; + + sinkTask.stop(); + PowerMock.expectLastCall(); + + // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the + // consumer so it exits quickly + consumer.wakeup(); + PowerMock.expectLastCall(); + + consumer.close(); + PowerMock.expectLastCall(); + } + + // Note that this can only be called once per test currently + private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception { + // Stub out all the consumer stream/iterator responses, which we just want to verify occur, + // but don't care about the exact details here. + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + // "Sleep" so time will progress + time.sleep(pollDelayMs); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(TOPIC, PARTITION), + Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) + ))); + recordsReturned++; + return records; + } + }); + EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); + EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); + Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); + sinkTask.put(EasyMock.capture(capturedRecords)); + EasyMock.expectLastCall().anyTimes(); + return capturedRecords; + } + + private IExpectationSetters<Object> expectOnePoll() { + // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of + // returning empty data, we return one record. The expectation is that the data will be ignored by the + // response behavior specified using the return value of this method. + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + // "Sleep" so time will progress + time.sleep(1L); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(TOPIC, PARTITION), + Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) + ))); + recordsReturned++; + return records; + } + }); + EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); + sinkTask.put(EasyMock.anyObject(Collection.class)); + return EasyMock.expectLastCall(); + } + + private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages, + final RuntimeException flushError, + final Exception consumerCommitError, + final long consumerCommitDelayMs, + final boolean invokeCallback) + throws Exception { + final long finalOffset = FIRST_OFFSET + expectedMessages; + + // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one + final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); + offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)); + offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.flush(offsetsToCommit); + IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall(); + if (flushError != null) { + flushExpectation.andThrow(flushError).once(); + return null; + } + + final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture(); + consumer.commitAsync(EasyMock.eq(offsetsToCommit), + EasyMock.capture(capturedCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + time.sleep(consumerCommitDelayMs); + if (invokeCallback) + capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError); + return null; + } + }); + return capturedCallback; + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java new file mode 100644 index 0000000..9a382b6 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.ThreadedTest; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +public class WorkerSourceTaskTest extends ThreadedTest { + private static final String TOPIC = "topic"; + private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); + private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); + + // Connect-format data + private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; + private static final Integer KEY = -1; + private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA; + private static final Long RECORD = 12L; + // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version + // is used in the right place. + private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); + private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); + + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + private WorkerConfig config; + @Mock private SourceTask sourceTask; + @Mock private Converter keyConverter; + @Mock private Converter valueConverter; + @Mock private KafkaProducer<byte[], byte[]> producer; + @Mock private OffsetStorageReader offsetReader; + @Mock private OffsetStorageWriter offsetWriter; + private WorkerSourceTask workerTask; + @Mock private Future<RecordMetadata> sendFuture; + + private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks; + + private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap(); + private static final List<SourceRecord> RECORDS = Arrays.asList( + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) + ); + + @Override + public void setup() { + super.setup(); + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + config = new StandaloneConfig(workerProps); + producerCallbacks = EasyMock.newCapture(); + } + + private void createWorkerTask() { + workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer, + offsetReader, offsetWriter, config, new SystemTime()); + } + + @Test + public void testPollsInBackground() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = expectPolls(10); + // In this test, we don't flush, so nothing goes any further than the offset writer + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testCommit() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(true); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + assertTrue(workerTask.commitOffsets()); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testCommitFailure() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch pollLatch = expectPolls(1); + expectOffsetFlush(false); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + awaitPolls(pollLatch); + assertFalse(workerTask.commitOffsets()); + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + @Test + public void testSendRecordsConvertsData() throws Exception { + createWorkerTask(); + + List<SourceRecord> records = new ArrayList<>(); + // Can just use the same record for key and value + records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); + + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(); + + PowerMock.replayAll(); + + Whitebox.invokeMethod(workerTask, "sendRecords", records); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); + assertEquals(SERIALIZED_RECORD, sent.getValue().value()); + + PowerMock.verifyAll(); + } + + @Test + public void testSlowTaskStart() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + Utils.sleep(100); + return null; + } + }); + sourceTask.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, + // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it + // cannot be invoked immediately in the thread trying to stop the task. + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } + + private CountDownLatch expectPolls(int count) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(count); + // Note that we stub these to allow any number of calls because the thread will continue to + // run. The count passed in + latch returned just makes sure we get *at least* that number of + // calls + EasyMock.expect(sourceTask.poll()) + .andStubAnswer(new IAnswer<List<SourceRecord>>() { + @Override + public List<SourceRecord> answer() throws Throwable { + latch.countDown(); + return RECORDS; + } + }); + // Fallout of the poll() call + expectSendRecord(); + return latch; + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException { + EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY); + EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD); + + Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); + // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work + EasyMock.expect( + producer.send(EasyMock.capture(sent), + EasyMock.capture(producerCallbacks))) + .andStubAnswer(new IAnswer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer() throws Throwable { + synchronized (producerCallbacks) { + for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null); + } + producerCallbacks.reset(); + } + return sendFuture; + } + }); + // 2. Offset data is passed to the offset storage. + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall().anyTimes(); + + return sent; + } + + private void awaitPolls(CountDownLatch latch) throws InterruptedException { + latch.await(1000, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("unchecked") + private void expectOffsetFlush(boolean succeed) throws Exception { + EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); + Future<Void> flushFuture = PowerMock.createMock(Future.class); + EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); + // Should throw for failure + IExpectationSetters<Void> futureGetExpect = EasyMock.expect( + flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); + if (succeed) { + futureGetExpect.andReturn(null); + } else { + futureGetExpect.andThrow(new TimeoutException()); + offsetWriter.cancelFlush(); + PowerMock.expectLastCall(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java new file mode 100644 index 0000000..4e25e9d --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.MockTime; +import org.apache.kafka.connect.util.ThreadedTest; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Worker.class) +@PowerMockIgnore("javax.management.*") +public class WorkerTest extends ThreadedTest { + + private static final String CONNECTOR_ID = "test-connector"; + private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0); + + private WorkerConfig config; + private Worker worker; + private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); + + @Before + public void setup() { + super.setup(); + + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + config = new StandaloneConfig(workerProps); + } + + @Test + public void testAddRemoveConnector() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + // Create + Connector connector = PowerMock.createMock(Connector.class); + ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); + EasyMock.expect(connector.version()).andReturn("1.0"); + + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); + + connector.initialize(ctx); + EasyMock.expectLastCall(); + connector.start(props); + EasyMock.expectLastCall(); + + // Remove + connector.stop(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + ConnectorConfig config = new ConnectorConfig(props); + assertEquals(Collections.emptySet(), worker.connectorNames()); + worker.addConnector(config, ctx); + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + try { + worker.addConnector(config, ctx); + fail("Should have thrown exception when trying to add connector with same name."); + } catch (ConnectException e) { + // expected + } + worker.stopConnector(CONNECTOR_ID); + assertEquals(Collections.emptySet(), worker.connectorNames()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testStopInvalidConnector() { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + worker.stopConnector(CONNECTOR_ID); + } + + @Test + public void testReconfigureConnectorTasks() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + // Create + Connector connector = PowerMock.createMock(Connector.class); + ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); + EasyMock.expect(connector.version()).andReturn("1.0"); + + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); + + connector.initialize(ctx); + EasyMock.expectLastCall(); + connector.start(props); + EasyMock.expectLastCall(); + + // Reconfigure + EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class); + Map<String, String> taskProps = new HashMap<>(); + taskProps.put("foo", "bar"); + EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); + + // Remove + connector.stop(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + ConnectorConfig config = new ConnectorConfig(props); + assertEquals(Collections.emptySet(), worker.connectorNames()); + worker.addConnector(config, ctx); + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + try { + worker.addConnector(config, ctx); + fail("Should have thrown exception when trying to add connector with same name."); + } catch (ConnectException e) { + // expected + } + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar")); + Map<String, String> expectedTaskProps = new HashMap<>(); + expectedTaskProps.put("foo", "bar"); + expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar"); + assertEquals(2, taskConfigs.size()); + assertEquals(expectedTaskProps, taskConfigs.get(0)); + assertEquals(expectedTaskProps, taskConfigs.get(1)); + worker.stopConnector(CONNECTOR_ID); + assertEquals(Collections.emptySet(), worker.connectorNames()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + + @Test + public void testAddRemoveTask() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + ConnectorTaskId taskId = new ConnectorTaskId("job", 0); + + // Create + TestSourceTask task = PowerMock.createMock(TestSourceTask.class); + WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task); + EasyMock.expect(task.version()).andReturn("1.0"); + + PowerMock.expectNew( + WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(KafkaProducer.class), + EasyMock.anyObject(OffsetStorageReader.class), + EasyMock.anyObject(OffsetStorageWriter.class), + EasyMock.anyObject(WorkerConfig.class), + EasyMock.anyObject(Time.class)) + .andReturn(workerTask); + Map<String, String> origProps = new HashMap<>(); + origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + workerTask.start(origProps); + EasyMock.expectLastCall(); + + // Remove + workerTask.stop(); + EasyMock.expectLastCall(); + EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); + workerTask.close(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + assertEquals(Collections.emptySet(), worker.taskIds()); + worker.addTask(taskId, new TaskConfig(origProps)); + assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds()); + worker.stopTask(taskId); + assertEquals(Collections.emptySet(), worker.taskIds()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testStopInvalidTask() { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + worker.stopTask(TASK_ID); + } + + @Test + public void testCleanupTasksOnStop() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + // Create + TestSourceTask task = PowerMock.createMock(TestSourceTask.class); + WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task); + EasyMock.expect(task.version()).andReturn("1.0"); + + PowerMock.expectNew( + WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(Converter.class), + EasyMock.anyObject(KafkaProducer.class), + EasyMock.anyObject(OffsetStorageReader.class), + EasyMock.anyObject(OffsetStorageWriter.class), + EasyMock.anyObject(WorkerConfig.class), + EasyMock.anyObject(Time.class)) + .andReturn(workerTask); + Map<String, String> origProps = new HashMap<>(); + origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + workerTask.start(origProps); + EasyMock.expectLastCall(); + + // Remove on Worker.stop() + workerTask.stop(); + EasyMock.expectLastCall(); + EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); + // Note that in this case we *do not* commit offsets since it's an unclean shutdown + workerTask.close(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + worker.addTask(TASK_ID, new TaskConfig(origProps)); + worker.stop(); + + PowerMock.verifyAll(); + } + + + private static class TestConnector extends Connector { + @Override + public String version() { + return "1.0"; + } + + @Override + public void start(Map<String, String> props) { + + } + + @Override + public Class<? extends Task> taskClass() { + return null; + } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + } + + private static class TestSourceTask extends SourceTask { + public TestSourceTask() { + } + + @Override + public String version() { + return "1.0"; + } + + @Override + public void start(Map<String, String> props) { + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + return null; + } + + @Override + public void stop() { + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java new file mode 100644 index 0000000..36f8fce --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.FutureCallback; +import org.apache.kafka.connect.util.TestFuture; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(DistributedHerder.class) +@PowerMockIgnore("javax.management.*") +public class DistributedHerderTest { + private static final Map<String, String> HERDER_CONFIG = new HashMap<>(); + static { + HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); + HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group"); + // The WorkerConfig base class has some required settings without defaults + HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + } + private static final String MEMBER_URL = "memberUrl"; + + private static final String CONN1 = "sourceA"; + private static final String CONN2 = "sourceB"; + private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0); + private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1); + private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2); + private static final Integer MAX_TASKS = 3; + private static final Map<String, String> CONN1_CONFIG = new HashMap<>(); + static { + CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1); + CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); + CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); + } + private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG); + static { + CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz"); + } + private static final Map<String, String> CONN2_CONFIG = new HashMap<>(); + static { + CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2); + CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); + CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); + } + private static final Map<String, String> TASK_CONFIG = new HashMap<>(); + static { + TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); + } + private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); + static { + TASK_CONFIGS.add(TASK_CONFIG); + TASK_CONFIGS.add(TASK_CONFIG); + TASK_CONFIGS.add(TASK_CONFIG); + } + private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>(); + static { + TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG); + TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG); + TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG); + } + private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet()); + private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet()); + + @Mock private KafkaConfigStorage configStorage; + @Mock private WorkerGroupMember member; + private MockTime time; + private DistributedHerder herder; + @Mock private Worker worker; + @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; + + private Callback<String> connectorConfigCallback; + private Callback<List<ConnectorTaskId>> taskConfigCallback; + private WorkerRebalanceListener rebalanceListener; + + @Before + public void setUp() throws Exception { + worker = PowerMock.createMock(Worker.class); + time = new MockTime(); + + herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, + new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time); + connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback"); + taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback"); + rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener"); + } + + @Test + public void testJoinAssignment() { + // Join group and get assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testHaltCleansUpWorker() { + EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1)); + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1)); + worker.stopTask(TASK1); + PowerMock.expectLastCall(); + member.stop(); + PowerMock.expectLastCall(); + configStorage.stop(); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.halt(); + + PowerMock.verifyAll(); + } + + @Test + public void testCreateConnector() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + + member.wakeup(); + PowerMock.expectLastCall(); + // CONN2 is new, should succeed + configStorage.putConnectorConfig(CONN2, CONN2_CONFIG); + PowerMock.expectLastCall(); + ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList()); + putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info)); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + // No immediate action besides this -- change will be picked up via the config log + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testCreateConnectorAlreadyExists() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + + member.wakeup(); + PowerMock.expectLastCall(); + // CONN1 already exists + putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + // No immediate action besides this -- change will be picked up via the config log + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testDestroyConnector() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + // Start with one connector + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + + // And delete the connector + member.wakeup(); + PowerMock.expectLastCall(); + configStorage.putConnectorConfig(CONN1, null); + PowerMock.expectLastCall(); + putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null)); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + // No immediate action besides this -- change will be picked up via the config log + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONN1, null, true, putConnectorCallback); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorConfigAdded() { + // If a connector was added, we need to rebalance + EasyMock.expect(member.memberId()).andStubReturn("member"); + + // join, no configs so no need to catch up on config topic + expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + // Checks for config updates and starts rebalance + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + member.requestRejoin(); + PowerMock.expectLastCall(); + // Performs rebalance and gets new assignment + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + connectorConfigCallback.onCompletion(null, CONN1); // read updated config + herder.tick(); // apply config + herder.tick(); // do rebalance + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorConfigUpdate() { + // Connector config can be applied without any rebalance + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + connectorConfigCallback.onCompletion(null, CONN1); // read updated config + herder.tick(); // apply config + + PowerMock.verifyAll(); + } + + @Test + public void testTaskConfigAdded() { + // Task config always requires rebalance + EasyMock.expect(member.memberId()).andStubReturn("member"); + + // join + expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + // Checks for config updates and starts rebalance + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + member.requestRejoin(); + PowerMock.expectLastCall(); + // Performs rebalance and gets new assignment + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0)); + worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config + herder.tick(); // apply config + herder.tick(); // do rebalance + + PowerMock.verifyAll(); + } + + @Test + public void testJoinLeaderCatchUpFails() throws Exception { + // Join group and as leader fail to do assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + // Reading to end of log times out + TestFuture<Void> readToEndFuture = new TestFuture<>(); + readToEndFuture.resolveOnGet(new TimeoutException()); + EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); + PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT); + member.requestRejoin(); + + // After backoff, restart the process and this time succeed + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testAccessors() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + + + member.wakeup(); + PowerMock.expectLastCall().anyTimes(); + // list connectors, get connector info, get connector config, get task configs + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + + PowerMock.replayAll(); + + FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>(); + herder.connectors(listConnectorsCb); + FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>(); + herder.connectorInfo(CONN1, connectorInfoCb); + FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>(); + herder.connectorConfig(CONN1, connectorConfigCb); + FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>(); + herder.taskConfigs(CONN1, taskConfigsCb); + + herder.tick(); + assertTrue(listConnectorsCb.isDone()); + assertEquals(Collections.singleton(CONN1), listConnectorsCb.get()); + assertTrue(connectorInfoCb.isDone()); + ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2)); + assertEquals(info, connectorInfoCb.get()); + assertTrue(connectorConfigCb.isDone()); + assertEquals(CONN1_CONFIG, connectorConfigCb.get()); + assertTrue(taskConfigsCb.isDone()); + assertEquals(Arrays.asList( + new TaskInfo(TASK0, TASK_CONFIG), + new TaskInfo(TASK1, TASK_CONFIG), + new TaskInfo(TASK2, TASK_CONFIG)), + taskConfigsCb.get()); + + PowerMock.verifyAll(); + } + + @Test + public void testPutConnectorConfig() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + + // list connectors, get connector info, get connector config, get task configs + member.wakeup(); + PowerMock.expectLastCall().anyTimes(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Poll loop for second round of calls + member.ensureActive(); + PowerMock.expectLastCall(); + configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + // Simulate response to writing config + waiting until end of log to be read + connectorConfigCallback.onCompletion(null, CONN1); + return null; + } + }); + // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart + // connector without rebalance + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture(); + worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Third tick just to read the config + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + // Should pick up original config + FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>(); + herder.connectorConfig(CONN1, connectorConfigCb); + herder.tick(); + assertTrue(connectorConfigCb.isDone()); + assertEquals(CONN1_CONFIG, connectorConfigCb.get()); + + // Apply new config. + FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new FutureCallback<>(); + herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb); + herder.tick(); + assertTrue(putConfigCb.isDone()); + ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2)); + assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get()); + + // Check config again to validate change + connectorConfigCb = new FutureCallback<>(); + herder.connectorConfig(CONN1, connectorConfigCb); + herder.tick(); + assertTrue(connectorConfigCb.isDone()); + assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get()); + // The config passed to Worker should + assertEquals(Arrays.asList("foo", "bar", "baz"), + capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG)); + PowerMock.verifyAll(); + } + + @Test + public void testInconsistentConfigs() throws Exception { + // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs + // This requires inter-worker communication, so needs the REST API + } + + + private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { + expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks); + } + + // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks. + private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks, + final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { + member.ensureActive(); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + if (revokedConnectors != null) + rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks); + ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment( + error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks); + rebalanceListener.onAssigned(assignment); + return null; + } + }); + member.wakeup(); + PowerMock.expectLastCall(); + } + + private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) { + TestFuture<Void> readToEndFuture = new TestFuture<>(); + readToEndFuture.resolveOnGet((Void) null); + EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); + EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot); + } + + + // We need to use a real class here due to some issue with mocking java.lang.Class + private abstract class BogusSourceConnector extends SourceConnector { + } + + private abstract class BogusSourceTask extends SourceTask { + } + +}