http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java deleted file mode 100644 index ac9df44..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.clients.ClientRequest; -import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.GroupCoordinatorResponse; -import org.apache.kafka.common.requests.JoinGroupResponse; -import org.apache.kafka.common.requests.SyncGroupRequest; -import org.apache.kafka.common.requests.SyncGroupResponse; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.copycat.storage.KafkaConfigStorage; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; -import org.easymock.Mock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; -import org.powermock.reflect.Whitebox; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -public class WorkerCoordinatorTest { - - private static final String LEADER_URL = "leaderUrl:8083"; - private static final String MEMBER_URL = "memberUrl:8083"; - - private String connectorId = "connector"; - private String connectorId2 = "connector2"; - private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0); - private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1); - private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0); - - private String groupId = "test-group"; - private int sessionTimeoutMs = 10; - private int heartbeatIntervalMs = 2; - private long retryBackoffMs = 100; - private long requestTimeoutMs = 5000; - private MockTime time; - private MockClient client; - private Cluster cluster = TestUtils.singletonCluster("topic", 1); - private Node node = cluster.nodes().get(0); - private Metadata metadata; - private Metrics metrics; - private Map<String, String> metricTags = new LinkedHashMap<>(); - private ConsumerNetworkClient consumerClient; - private MockRebalanceListener rebalanceListener; - @Mock private KafkaConfigStorage configStorage; - private WorkerCoordinator coordinator; - - private ClusterConfigState configState1; - private ClusterConfigState configState2; - - @Before - public void setup() { - this.time = new MockTime(); - this.client = new MockClient(time); - this.metadata = new Metadata(0, Long.MAX_VALUE); - this.metadata.update(cluster, time.milliseconds()); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); - this.metrics = new Metrics(time); - this.rebalanceListener = new MockRebalanceListener(); - this.configStorage = PowerMock.createMock(KafkaConfigStorage.class); - - client.setNode(node); - - this.coordinator = new WorkerCoordinator(consumerClient, - groupId, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - "consumer" + groupId, - metricTags, - time, - requestTimeoutMs, - retryBackoffMs, - LEADER_URL, - configStorage, - rebalanceListener); - - configState1 = new ClusterConfigState( - 1L, Collections.singletonMap(connectorId, 1), - Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()), - Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()), - Collections.<String>emptySet() - ); - Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>(); - configState2ConnectorTaskCounts.put(connectorId, 2); - configState2ConnectorTaskCounts.put(connectorId2, 1); - Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>(); - configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>()); - configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>()); - Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>(); - configState2TaskConfigs.put(taskId0, new HashMap<String, String>()); - configState2TaskConfigs.put(taskId1, new HashMap<String, String>()); - configState2TaskConfigs.put(taskId2, new HashMap<String, String>()); - configState2 = new ClusterConfigState( - 2L, configState2ConnectorTaskCounts, - configState2ConnectorConfigs, - configState2TaskConfigs, - Collections.<String>emptySet() - ); - } - - @After - public void teardown() { - this.metrics.close(); - } - - // We only test functionality unique to WorkerCoordinator. Most functionality is already well tested via the tests - // that cover AbstractCoordinator & ConsumerCoordinator. - - @Test - public void testMetadata() { - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - - PowerMock.replayAll(); - - LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata(); - assertEquals(1, serialized.size()); - CopycatProtocol.WorkerState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL)); - assertEquals(1, state.offset()); - - PowerMock.verifyAll(); - } - - @Test - public void testNormalJoinGroupLeader() { - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - - PowerMock.replayAll(); - - final String consumerId = "leader"; - - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // normal join group - Map<String, Long> memberConfigOffsets = new HashMap<>(); - memberConfigOffsets.put("leader", 1L); - memberConfigOffsets.put("member", 1L); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code())); - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); - return sync.memberId().equals(consumerId) && - sync.generationId() == 1 && - sync.groupAssignment().containsKey(consumerId); - } - }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), - Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); - coordinator.ensureActiveGroup(); - - assertFalse(coordinator.needRejoin()); - assertEquals(0, rebalanceListener.revokedCount); - assertEquals(1, rebalanceListener.assignedCount); - assertFalse(rebalanceListener.assignment.failed()); - assertEquals(1L, rebalanceListener.assignment.offset()); - assertEquals("leader", rebalanceListener.assignment.leader()); - assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); - assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); - - PowerMock.verifyAll(); - } - - @Test - public void testNormalJoinGroupFollower() { - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - - PowerMock.replayAll(); - - final String memberId = "member"; - - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // normal join group - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); - return sync.memberId().equals(memberId) && - sync.generationId() == 1 && - sync.groupAssignment().isEmpty(); - } - }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), - Collections.singletonList(taskId0), Errors.NONE.code())); - coordinator.ensureActiveGroup(); - - assertFalse(coordinator.needRejoin()); - assertEquals(0, rebalanceListener.revokedCount); - assertEquals(1, rebalanceListener.assignedCount); - assertFalse(rebalanceListener.assignment.failed()); - assertEquals(1L, rebalanceListener.assignment.offset()); - assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); - - PowerMock.verifyAll(); - } - - @Test - public void testJoinLeaderCannotAssign() { - // If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately - // need to retry the join. - - // When the first round fails, we'll take an updated config snapshot - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - EasyMock.expect(configStorage.snapshot()).andReturn(configState2); - - PowerMock.replayAll(); - - final String memberId = "member"; - - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // config mismatch results in assignment error - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); - MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); - return sync.memberId().equals(memberId) && - sync.generationId() == 1 && - sync.groupAssignment().isEmpty(); - } - }; - client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L, - Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); - client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, - Collections.<String>emptyList(), Collections.singletonList(taskId0), Errors.NONE.code())); - coordinator.ensureActiveGroup(); - - PowerMock.verifyAll(); - } - - @Test - public void testRejoinGroup() { - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - - PowerMock.replayAll(); - - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // join the group once - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), - Collections.singletonList(taskId0), Errors.NONE.code())); - coordinator.ensureActiveGroup(); - - assertEquals(0, rebalanceListener.revokedCount); - assertEquals(1, rebalanceListener.assignedCount); - assertFalse(rebalanceListener.assignment.failed()); - assertEquals(1L, rebalanceListener.assignment.offset()); - assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); - - // and join the group again - coordinator.requestRejoin(); - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), - Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); - coordinator.ensureActiveGroup(); - - assertEquals(1, rebalanceListener.revokedCount); - assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors); - assertEquals(Collections.singletonList(taskId0), rebalanceListener.revokedTasks); - assertEquals(2, rebalanceListener.assignedCount); - assertFalse(rebalanceListener.assignment.failed()); - assertEquals(1L, rebalanceListener.assignment.offset()); - assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); - assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); - - PowerMock.verifyAll(); - } - - @Test - public void testLeaderPerformAssignment1() throws Exception { - // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its - // output. So we test it directly here. - - EasyMock.expect(configStorage.snapshot()).andReturn(configState1); - - PowerMock.replayAll(); - - // Prime the current configuration state - coordinator.metadata(); - - Map<String, ByteBuffer> configs = new HashMap<>(); - // Mark everyone as in sync with configState1 - configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L))); - configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L))); - Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs); - - // configState1 has 1 connector, 1 task - CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader")); - assertEquals(false, leaderAssignment.failed()); - assertEquals("leader", leaderAssignment.leader()); - assertEquals(1, leaderAssignment.offset()); - assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); - assertEquals(Collections.emptyList(), leaderAssignment.tasks()); - - CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member")); - assertEquals(false, memberAssignment.failed()); - assertEquals("leader", memberAssignment.leader()); - assertEquals(1, memberAssignment.offset()); - assertEquals(Collections.emptyList(), memberAssignment.connectors()); - assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); - - PowerMock.verifyAll(); - } - - @Test - public void testLeaderPerformAssignment2() throws Exception { - // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its - // output. So we test it directly here. - - EasyMock.expect(configStorage.snapshot()).andReturn(configState2); - - PowerMock.replayAll(); - - // Prime the current configuration state - coordinator.metadata(); - - Map<String, ByteBuffer> configs = new HashMap<>(); - // Mark everyone as in sync with configState1 - configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(LEADER_URL, 1L))); - configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(MEMBER_URL, 1L))); - Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs); - - // configState2 has 2 connector, 3 tasks and should trigger round robin assignment - CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader")); - assertEquals(false, leaderAssignment.failed()); - assertEquals("leader", leaderAssignment.leader()); - assertEquals(1, leaderAssignment.offset()); - assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); - assertEquals(Arrays.asList(taskId1, taskId2), leaderAssignment.tasks()); - - CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member")); - assertEquals(false, memberAssignment.failed()); - assertEquals("leader", memberAssignment.leader()); - assertEquals(1, memberAssignment.offset()); - assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors()); - assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); - - PowerMock.verifyAll(); - } - - - private Struct groupMetadataResponse(Node node, short error) { - GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); - return response.toStruct(); - } - - private Struct joinGroupLeaderResponse(int generationId, String memberId, - Map<String, Long> configOffsets, short error) { - Map<String, ByteBuffer> metadata = new HashMap<>(); - for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) { - // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID - String memberUrl = configStateEntry.getKey(); - long configOffset = configStateEntry.getValue(); - ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.WorkerState(memberUrl, configOffset)); - metadata.put(configStateEntry.getKey(), buf); - } - return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct(); - } - - private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { - return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId, - Collections.<String, ByteBuffer>emptyMap()).toStruct(); - } - - private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds, - List<ConnectorTaskId> taskIds, short error) { - CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds); - ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment); - return new SyncGroupResponse(error, buf).toStruct(); - } - - - private static class MockRebalanceListener implements WorkerRebalanceListener { - public CopycatProtocol.Assignment assignment = null; - - public String revokedLeader; - public Collection<String> revokedConnectors; - public Collection<ConnectorTaskId> revokedTasks; - - public int revokedCount = 0; - public int assignedCount = 0; - - @Override - public void onAssigned(CopycatProtocol.Assignment assignment) { - this.assignment = assignment; - assignedCount++; - } - - @Override - public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { - this.revokedLeader = leader; - this.revokedConnectors = connectors; - this.revokedTasks = tasks; - revokedCount++; - } - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java deleted file mode 100644 index c987092..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java +++ /dev/null @@ -1,364 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.rest.resources; - -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.kafka.copycat.errors.AlreadyExistsException; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.NotFoundException; -import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.distributed.NotLeaderException; -import org.apache.kafka.copycat.runtime.rest.RestServer; -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest; -import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(RestServer.class) -@PowerMockIgnore("javax.management.*") -public class ConnectorsResourceTest { - // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle - // URL construction properly, avoiding //, which will mess up routing in the REST server - private static final String LEADER_URL = "http://leader:8083/"; - private static final String CONNECTOR_NAME = "test"; - private static final String CONNECTOR2_NAME = "test2"; - private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>(); - static { - CONNECTOR_CONFIG.put("name", CONNECTOR_NAME); - CONNECTOR_CONFIG.put("sample_config", "test_config"); - } - private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList( - new ConnectorTaskId(CONNECTOR_NAME, 0), - new ConnectorTaskId(CONNECTOR_NAME, 1) - ); - private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); - static { - TASK_CONFIGS.add(Collections.singletonMap("config", "value")); - TASK_CONFIGS.add(Collections.singletonMap("config", "other_value")); - } - private static final List<TaskInfo> TASK_INFOS = new ArrayList<>(); - static { - TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), TASK_CONFIGS.get(0))); - TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1))); - } - - - @Mock - private Herder herder; - private ConnectorsResource connectorsResource; - - @Before - public void setUp() throws NoSuchMethodException { - PowerMock.mockStatic(RestServer.class, - RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class)); - connectorsResource = new ConnectorsResource(herder); - } - - @Test - public void testListConnectors() throws Throwable { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - - PowerMock.replayAll(); - - Collection<String> connectors = connectorsResource.listConnectors(); - // Ordering isn't guaranteed, compare sets - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); - - PowerMock.verifyAll(); - } - - @Test - public void testListConnectorsNotLeader() throws Throwable { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"), - EasyMock.isNull(), EasyMock.anyObject(TypeReference.class))) - .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME))); - - PowerMock.replayAll(); - - Collection<String> connectors = connectorsResource.listConnectors(); - // Ordering isn't guaranteed, compare sets - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); - - PowerMock.verifyAll(); - } - - @Test(expected = CopycatException.class) - public void testListConnectorsNotSynced() throws Throwable { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackException(cb, new CopycatException("not synced")); - - PowerMock.replayAll(); - - // throws - connectorsResource.listConnectors(); - } - - @Test - public void testCreateConnector() throws Throwable { - CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); - - PowerMock.replayAll(); - - connectorsResource.createConnector(body); - - PowerMock.verifyAll(); - } - - @Test - public void testCreateConnectorNotLeader() throws Throwable { - CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject())) - .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); - - PowerMock.replayAll(); - - connectorsResource.createConnector(body); - - PowerMock.verifyAll(); - } - - @Test(expected = AlreadyExistsException.class) - public void testCreateConnectorExists() throws Throwable { - CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackException(cb, new AlreadyExistsException("already exists")); - - PowerMock.replayAll(); - - connectorsResource.createConnector(body); - - PowerMock.verifyAll(); - } - - @Test - public void testDeleteConnector() throws Throwable { - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); - expectAndCallbackResult(cb, null); - - PowerMock.replayAll(); - - connectorsResource.destroyConnector(CONNECTOR_NAME); - - PowerMock.verifyAll(); - } - - @Test - public void testDeleteConnectorNotLeader() throws Throwable { - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null)) - .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null)); - - PowerMock.replayAll(); - - connectorsResource.destroyConnector(CONNECTOR_NAME); - - PowerMock.verifyAll(); - } - - // Not found exceptions should pass through to caller so they can be processed for 404s - @Test(expected = NotFoundException.class) - public void testDeleteConnectorNotFound() throws Throwable { - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); - - connectorsResource.destroyConnector(CONNECTOR_NAME); - - PowerMock.verifyAll(); - } - - @Test - public void testGetConnector() throws Throwable { - final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance(); - herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)); - - PowerMock.replayAll(); - - ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME); - assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo); - - PowerMock.verifyAll(); - } - - @Test - public void testGetConnectorConfig() throws Throwable { - final Capture<Callback<Map<String, String>>> cb = Capture.newInstance(); - herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackResult(cb, CONNECTOR_CONFIG); - - PowerMock.replayAll(); - - Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME); - assertEquals(CONNECTOR_CONFIG, connConfig); - - PowerMock.verifyAll(); - } - - @Test(expected = NotFoundException.class) - public void testGetConnectorConfigConnectorNotFound() throws Throwable { - final Capture<Callback<Map<String, String>>> cb = Capture.newInstance(); - herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); - - connectorsResource.getConnectorConfig(CONNECTOR_NAME); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorConfig() throws Throwable { - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb)); - expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); - - PowerMock.replayAll(); - - connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG); - - PowerMock.verifyAll(); - } - - @Test - public void testGetConnectorTaskConfigs() throws Throwable { - final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance(); - herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackResult(cb, TASK_INFOS); - - PowerMock.replayAll(); - - List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME); - assertEquals(TASK_INFOS, taskInfos); - - PowerMock.verifyAll(); - } - - @Test(expected = NotFoundException.class) - public void testGetConnectorTaskConfigsConnectorNotFound() throws Throwable { - final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance(); - herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("connector not found")); - - PowerMock.replayAll(); - - connectorsResource.getTaskConfigs(CONNECTOR_NAME); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorTaskConfigs() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb)); - expectAndCallbackResult(cb, null); - - PowerMock.replayAll(); - - connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); - - PowerMock.verifyAll(); - } - - @Test(expected = NotFoundException.class) - public void testPutConnectorTaskConfigsConnectorNotFound() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); - - connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); - - PowerMock.verifyAll(); - } - - private <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) { - PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() { - @Override - public Void answer() throws Throwable { - cb.getValue().onCompletion(null, value); - return null; - } - }); - } - - private <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) { - PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() { - @Override - public Void answer() throws Throwable { - cb.getValue().onCompletion(t, null); - return null; - } - }); - } - - private <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) { - expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java deleted file mode 100644 index f1bd317..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.standalone; - -import org.apache.kafka.copycat.connector.Connector; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.AlreadyExistsException; -import org.apache.kafka.copycat.errors.NotFoundException; -import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.HerderConnectorContext; -import org.apache.kafka.copycat.runtime.TaskConfig; -import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.source.SourceConnector; -import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.FutureCallback; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -public class StandaloneHerderTest { - private static final String CONNECTOR_NAME = "test"; - private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2"); - private static final String TOPICS_LIST_STR = "topic1,topic2"; - private static final int DEFAULT_MAX_TASKS = 1; - - private StandaloneHerder herder; - @Mock protected Worker worker; - private Connector connector; - @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback; - - @Before - public void setup() { - worker = PowerMock.createMock(Worker.class); - herder = new StandaloneHerder(worker); - } - - @Test - public void testCreateSourceConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - - PowerMock.verifyAll(); - } - - @Test - public void testCreateConnectorAlreadyExists() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); - // First addition should succeed - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - - // Second should fail - createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - - PowerMock.verifyAll(); - } - - @Test - public void testCreateSinkConnector() throws Exception { - connector = PowerMock.createMock(BogusSinkConnector.class); - expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, BogusSinkTask.class, true); - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, createCallback); - - PowerMock.verifyAll(); - } - - @Test - public void testDestroyConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - expectDestroy(); - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>(); - herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb); - futureCb.get(1000L, TimeUnit.MILLISECONDS); - - // Second deletion should fail since the connector is gone - futureCb = new FutureCallback<>(); - herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb); - try { - futureCb.get(1000L, TimeUnit.MILLISECONDS); - fail("Should have thrown NotFoundException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof NotFoundException); - } - - PowerMock.verifyAll(); - } - - @Test - public void testCreateAndStop() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked - expectStop(); - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - herder.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testAccessors() throws Exception { - Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class); - - Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class); - Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class); - Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class); - Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class); - - // Check accessors with empty worker - listConnectorsCb.onCompletion(null, Collections.EMPTY_LIST); - EasyMock.expectLastCall(); - connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<ConnectorInfo>isNull()); - EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<Map<String, String>>isNull()); - EasyMock.expectLastCall(); - taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<List<TaskInfo>>isNull()); - EasyMock.expectLastCall(); - - - // Create connector - connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - - // Validate accessors with 1 connector - listConnectorsCb.onCompletion(null, Arrays.asList(CONNECTOR_NAME)); - EasyMock.expectLastCall(); - ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); - connectorInfoCb.onCompletion(null, connInfo); - EasyMock.expectLastCall(); - connectorConfigCb.onCompletion(null, connConfig); - EasyMock.expectLastCall(); - - TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(BogusSourceTask.class, false)); - taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo)); - EasyMock.expectLastCall(); - - - PowerMock.replayAll(); - - // All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call - herder.connectors(listConnectorsCb); - herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); - herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); - herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); - - herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); - herder.connectors(listConnectorsCb); - herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); - herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); - herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorConfig() throws Exception { - Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class); - Map<String, String> newConnConfig = new HashMap<>(connConfig); - newConnConfig.put("foo", "bar"); - - Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class); - Callback<Herder.Created<ConnectorInfo>> putConnectorConfigCb = PowerMock.createMock(Callback.class); - - // Create - connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); - // Should get first config - connectorConfigCb.onCompletion(null, connConfig); - EasyMock.expectLastCall(); - // Update config, which requires stopping and restarting - worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture(); - worker.addConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject()); - EasyMock.expectLastCall(); - // Generate same task config, which should result in no additional action to restart tasks - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST)) - .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false))); - ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); - putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo)); - EasyMock.expectLastCall(); - // Should get new config - connectorConfigCb.onCompletion(null, newConnConfig); - EasyMock.expectLastCall(); - - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); - herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); - herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, putConnectorConfigCb); - assertEquals("bar", capturedConfig.getValue().originals().get("foo")); - herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); - - PowerMock.verifyAll(); - - } - - @Test(expected = UnsupportedOperationException.class) - public void testPutTaskConfigs() { - Callback<Void> cb = PowerMock.createMock(Callback.class); - - PowerMock.replayAll(); - - herder.putTaskConfigs(CONNECTOR_NAME, - Arrays.asList(Collections.singletonMap("config", "value")), - cb); - - PowerMock.verifyAll(); - } - - private void expectAdd(String name, Class<? extends Connector> connClass, Class<? extends Task> taskClass, - boolean sink) throws Exception { - Map<String, String> connectorProps = connectorConfig(name, connClass); - - worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class)); - PowerMock.expectLastCall(); - - ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); - createCallback.onCompletion(null, new Herder.Created<>(true, connInfo)); - PowerMock.expectLastCall(); - - // And we should instantiate the tasks. For a sink task, we should see added properties for - // the input topic partitions - Map<String, String> generatedTaskProps = taskConfig(taskClass, sink); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST)) - .andReturn(Collections.singletonList(generatedTaskProps)); - - worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps)); - PowerMock.expectLastCall(); - } - - private void expectStop() { - worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0)); - EasyMock.expectLastCall(); - worker.stopConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); - } - - private void expectDestroy() { - expectStop(); - } - - - private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) { - HashMap<String, String> connectorProps = new HashMap<>(); - connectorProps.put(ConnectorConfig.NAME_CONFIG, name); - connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); - connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); - return connectorProps; - } - - private static Map<String, String> taskConfig(Class<? extends Task> taskClass, boolean sink) { - HashMap<String, String> generatedTaskProps = new HashMap<>(); - // Connectors can add any settings, so these are arbitrary - generatedTaskProps.put("foo", "bar"); - generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName()); - if (sink) - generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); - return generatedTaskProps; - } - - // We need to use a real class here due to some issue with mocking java.lang.Class - private abstract class BogusSourceConnector extends SourceConnector { - } - - private abstract class BogusSourceTask extends SourceTask { - } - - private abstract class BogusSinkConnector extends SinkConnector { - } - - private abstract class BogusSinkTask extends SourceTask { - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java deleted file mode 100644 index 2976c0a..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.util.Callback; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class FileOffsetBackingStoreTest { - - FileOffsetBackingStore store; - Map<String, Object> props; - File tempFile; - - private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>(); - - static { - firstSet.put(buffer("key"), buffer("value")); - firstSet.put(null, null); - } - - @Before - public void setup() throws IOException { - store = new FileOffsetBackingStore(); - tempFile = File.createTempFile("fileoffsetbackingstore", null); - props = new HashMap<>(); - props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath()); - store.configure(props); - store.start(); - } - - @After - public void teardown() { - tempFile.delete(); - } - - @Test - public void testGetSet() throws Exception { - Callback<Void> setCallback = expectSuccessfulSetCallback(); - Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback(); - PowerMock.replayAll(); - - store.set(firstSet, setCallback).get(); - - Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get(); - assertEquals(buffer("value"), values.get(buffer("key"))); - assertEquals(null, values.get(buffer("bad"))); - - PowerMock.verifyAll(); - } - - @Test - public void testSaveRestore() throws Exception { - Callback<Void> setCallback = expectSuccessfulSetCallback(); - Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback(); - PowerMock.replayAll(); - - store.set(firstSet, setCallback).get(); - store.stop(); - - // Restore into a new store to ensure correct reload from scratch - FileOffsetBackingStore restore = new FileOffsetBackingStore(); - restore.configure(props); - restore.start(); - Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get(); - assertEquals(buffer("value"), values.get(buffer("key"))); - - PowerMock.verifyAll(); - } - - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); - } - - private Callback<Void> expectSuccessfulSetCallback() { - @SuppressWarnings("unchecked") - Callback<Void> setCallback = PowerMock.createMock(Callback.class); - setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class)); - PowerMock.expectLastCall(); - return setCallback; - } - - @SuppressWarnings("unchecked") - private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() { - Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class); - getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class)); - PowerMock.expectLastCall(); - return getCallback; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java deleted file mode 100644 index 7c25feb..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java +++ /dev/null @@ -1,522 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.copycat.data.Field; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.data.Struct; -import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.KafkaBasedLog; -import org.apache.kafka.copycat.util.TestFuture; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaConfigStorage.class) -@PowerMockIgnore("javax.management.*") -public class KafkaConfigStorageTest { - private static final String TOPIC = "copycat-configs"; - private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>(); - - static { - DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC); - DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); - } - - private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); - private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2"); - private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2"); - - // Need a) connector with multiple tasks and b) multiple connectors - private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList( - new ConnectorTaskId("connector1", 0), - new ConnectorTaskId("connector1", 1), - new ConnectorTaskId("connector2", 0) - ); - private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0"); - - // Need some placeholders -- the contents don't matter here, just that they are restored properly - private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList( - Collections.singletonMap("config-key-one", "config-value-one"), - Collections.singletonMap("config-key-two", "config-value-two"), - Collections.singletonMap("config-key-three", "config-value-three") - ); - private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList( - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) - ); - private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList( - new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), - new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) - ); - - private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR - = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); - - // The exact format doesn't matter here since both conversions are mocked - private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList( - "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), - "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), - "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes() - ); - - @Mock - private Converter converter; - @Mock - private Callback<String> connectorReconfiguredCallback; - @Mock - private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback; - @Mock - KafkaBasedLog<String, byte[]> storeLog; - private KafkaConfigStorage configStorage; - - private Capture<String> capturedTopic = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); - private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture(); - - private long logOffset = 0; - - @Before - public void setUp() { - configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"}, - converter, connectorReconfiguredCallback, tasksReconfiguredCallback); - } - - @Test - public void testStartStop() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS); - assertEquals(TOPIC, capturedTopic.getValue()); - assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); - - configStorage.start(); - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorConfig() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0)); - EasyMock.expectLastCall(); - - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), - "properties", SAMPLE_CONFIGS.get(1)); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); - EasyMock.expectLastCall(); - - // Config deletion - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); - EasyMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS); - configStorage.start(); - - // Null before writing - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Writing should block until it is written and read back from Kafka - configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)); - configState = configStorage.snapshot(); - assertEquals(1, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Second should also block and all configs should still be available - configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1)); - configState = configStorage.snapshot(); - assertEquals(2, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Deletion should remove the second one we added - configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null); - configState = configStorage.snapshot(); - assertEquals(3, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutTaskConfigs() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - - // Task configs should read to end, write to the log, read to end, write root, then read to end again - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), - "properties", SAMPLE_CONFIGS.get(1)); - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), - "tasks", 2); // Starts with 0 tasks, after update has 2 - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))); - EasyMock.expectLastCall(); - - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1)); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); - expectReadToEnd(serializedConfigs); - - expectStop(); - - PowerMock.replayAll(); - - - configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS); - configStorage.start(); - - // Bootstrap as if we had already added the connector, but no tasks had been added yet - whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST); - - // Null before writing - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - assertNull(configState.taskConfig(TASK_IDS.get(0))); - assertNull(configState.taskConfig(TASK_IDS.get(1))); - - // Writing task task configs should block until all the writes have been performed and the root record update - // has completed - Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); - taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)); - taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1)); - configStorage.putTaskConfigs(taskConfigs); - - // Validate root config by listing all connectors and tasks - configState = configStorage.snapshot(); - assertEquals(3, configState.offset()); - String connectorName = CONNECTOR_IDS.get(0); - assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestore() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state is ignored. - - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), - new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), - new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), - // Connector after root update should make it through, task update shouldn't - new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), - new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); - deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 7; - expectStart(existingRecords, deserialized); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS); - configStorage.start(); - - // Should see a single connector and its config should be the last one seen anywhere in the log - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] - assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); - // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { - // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. - // We start out by loading an initial configuration where we started to write a task update and failed before - // writing an the commit, and then compaction cleaned up the earlier record. - - expectConfigure(); - List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), - // This is the record that has been compacted: - //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), - new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 6; - expectStart(existingRecords, deserialized); - - // One failed attempt to write new task configs - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - - // Successful attempt to write new task config - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), - "tasks", 1); // Updated to just 1 task - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0))); - EasyMock.expectLastCall(); - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); - expectReadToEnd(serializedConfigs); - - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS); - configStorage.start(); - // After reading the log, it should have been in an inconsistent state - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be next to be read, not last committed - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list - assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertNull(configState.taskConfig(TASK_IDS.get(0))); - assertNull(configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); - - // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks) - try { - configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2))); - fail("Should have failed due to incomplete task set."); - } catch (KafkaException e) { - // expected - } - - // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case - // we are going to shrink the number of tasks to 1 - configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0))); - // Validate updated config - configState = configStorage.snapshot(); - // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written - // to the topic. Only the last call with 1 task config + 1 commit actually gets written. - assertEquals(8, configState.offset()); - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - private void expectConfigure() throws Exception { - PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", - EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), - EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback)) - .andReturn(storeLog); - } - - // If non-empty, deserializations should be a LinkedHashMap - private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords, - final Map<byte[], Struct> deserializations) throws Exception { - storeLog.start(); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - for (ConsumerRecord<String, byte[]> rec : preexistingRecords) - capturedConsumedCallback.getValue().onCompletion(null, rec); - return null; - } - }); - for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) { - // Note null schema because default settings for internal serialization are schema-less - EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey()))) - .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue()))); - } - } - - private void expectStop() { - storeLog.stop(); - PowerMock.expectLastCall(); - } - - // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back - // from the log. Validate the data that is captured when the conversion is performed matches the specified data - // (by checking a single field's value) - private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, - final String dataFieldName, final Object dataFieldValue) { - final Capture<Struct> capturedRecord = EasyMock.newCapture(); - if (serialized != null) - EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) - .andReturn(serialized); - storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); - PowerMock.expectLastCall(); - EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized))) - .andAnswer(new IAnswer<SchemaAndValue>() { - @Override - public SchemaAndValue answer() throws Throwable { - if (dataFieldName != null) - assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); - // Note null schema because default settings for internal serialization are schema-less - return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); - } - }); - } - - // This map needs to maintain ordering - private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) { - EasyMock.expect(storeLog.readToEnd()) - .andAnswer(new IAnswer<Future<Void>>() { - @Override - public Future<Void> answer() throws Throwable { - TestFuture<Void> future = new TestFuture<Void>(); - for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue())); - future.resolveOnGet((Void) null); - return future; - } - }); - } - - - private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, - final String dataFieldName, final Object dataFieldValue) { - expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue); - LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>(); - recordsToRead.put(configKey, serialized); - expectReadToEnd(recordsToRead); - } - - // Manually insert a connector into config storage, updating the task configs, connector config, and root config - private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) { - Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs"); - for (int i = 0; i < taskConfigs.size(); i++) - storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i)); - - Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs"); - connectorConfigs.put(connectorName, connectorConfig); - - Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size()); - } - - // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted - private Map<String, Object> structToMap(Struct struct) { - HashMap<String, Object> result = new HashMap<>(); - for (Field field : struct.schema().fields()) - result.put(field.name(), struct.get(field)); - return result; - } - -}