C0urante commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1290659476


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3929,543 +3008,636 @@ public void 
testPollDurationOnSlowConnectorOperations() {
         final int rebalanceDelayMs = 20000;
         final long operationDelayMs = 10000;
         final long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+        when(member.memberId()).thenReturn("member");
+        
when(member.currentProtocolVersion()).thenReturn(connectProtocolVersion);
 
         // Assign the connector to this worker, and have it start
         expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
         expectConfigRefreshAndSnapshot(SNAPSHOT);
-        Capture<Callback<TargetState>> onFirstStart = newCapture();
-        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
-                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onFirstStart));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        ArgumentCaptor<Callback<TargetState>> onFirstStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             time.sleep(operationDelayMs);
             onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
-        member.wakeup();
-        PowerMock.expectLastCall();
-        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> 
TASK_CONFIGS);
-        // We should poll for less than the delay - time to start the 
connector, meaning that a long connector start
-        // does not delay the poll timeout
-        member.poll(leq(maxPollWaitMs));
-        PowerMock.expectLastCall();
+        }).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG), any(), 
eq(herder), eq(TargetState.STARTED), onFirstStart.capture());
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> 
TASK_CONFIGS);
+
+        herder.tick();
 
         // Rebalance again due to config update
-        member.wakeup();
-        PowerMock.expectLastCall();
         expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
-        
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
-
-        worker.stopAndAwaitConnector(CONN1);
-        PowerMock.expectLastCall();
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
-        Capture<Callback<TargetState>> onSecondStart = newCapture();
-        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
-                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onSecondStart));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        
when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+        doNothing().when(worker).stopAndAwaitConnector(CONN1);
+
+        ArgumentCaptor<Callback<TargetState>> onSecondStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             time.sleep(operationDelayMs);
             onSecondStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.poll(leq(maxPollWaitMs));
-        PowerMock.expectLastCall();
+        }).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG_UPDATED), 
any(), eq(herder), eq(TargetState.STARTED), onSecondStart.capture());
+        expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, 
invocation -> TASK_CONFIGS);
+
+        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated 
config
+        herder.tick();
 
         // Third tick should resolve all outstanding requests
         expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
         // which includes querying the connector task configs after the update
-        expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, () -> {
+        expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, 
invocation -> {
             time.sleep(operationDelayMs);
             return TASK_CONFIGS;
         });
-        member.poll(leq(maxPollWaitMs));
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-        herder.tick();
-        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated 
config
-        herder.tick();
         herder.tick();
-        PowerMock.verifyAll();
+
+        // We should poll for less than the delay - time to start the 
connector, meaning that a long connector start
+        // does not delay the poll timeout
+        verify(member, times(3)).poll(leq(maxPollWaitMs));
+        verify(worker, times(2)).startConnector(eq(CONN1), any(), any(), 
eq(herder), eq(TargetState.STARTED), any());
+        verifyNoMoreInteractions(member, worker, configBackingStore);
     }
 
     @Test
     public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT);
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
-
-        PowerMock.replayAll();
 
         herder.startAndStopExecutor.shutdown();
         assertThrows(RejectedExecutionException.class, herder::tick);
+    }
+
+    @Test
+    public void 
testTaskReconfigurationRetriesWithConnectorTaskConfigsException() {
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        when(worker.isRunning(CONN1)).thenReturn(true);
+        when(worker.getPlugins()).thenReturn(plugins);
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+
+        when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .thenThrow(new ConnectException("Failed to generate task 
configs"))
+                .thenThrow(new ConnectException("Failed to generate task 
configs"))
+                .thenReturn(TASK_CONFIGS);
+
+        expectAndVerifyTaskReconfigurationRetries();
+    }
+
+    @Test
+    public void 
testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
+        herder = mock(DistributedHerder.class, 
withSettings().defaultAnswer(CALLS_REAL_METHODS).useConstructor(new 
DistributedConfig(HERDER_CONFIG),
+                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, 
configBackingStore, member, MEMBER_URL, restClient, metrics, time,
+                noneConnectorClientConfigOverridePolicy, 
Collections.emptyList(), new MockSynchronousExecutor(), new AutoCloseable[]{}));
+
+        rebalanceListener = herder.new RebalanceListener(time);
+
+        when(member.memberId()).thenReturn("member");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        when(worker.isRunning(CONN1)).thenReturn(true);
+        when(worker.getPlugins()).thenReturn(plugins);
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+
+        List<Map<String, String>> changedTaskConfigs = new 
ArrayList<>(TASK_CONFIGS);
+        changedTaskConfigs.add(TASK_CONFIG);
+        when(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig)).thenReturn(changedTaskConfigs);
+
+        when(restClient.httpRequest(any(), eq("POST"), any(), any(), any(), 
any(), any()))
+                .thenThrow(new ConnectException("Request to leader to 
reconfigure connector tasks failed"))
+                .thenThrow(new ConnectException("Request to leader to 
reconfigure connector tasks failed"))
+                .thenReturn(null);
+
+        expectAndVerifyTaskReconfigurationRetries();
+    }
+
+    private void expectAndVerifyTaskReconfigurationRetries() {
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request 
with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();
+        // advance the time by 500ms so that the task reconfiguration request 
with double the initial retry backoff is processed
+        time.sleep(500);
+        herder.tick();
+
+        // 1. end of initial tick when no request has been added to the herder 
queue yet
+        // 2. the third task reconfiguration request is expected to pass; so 
expect no more retries (a Long.MAX_VALUE poll
+        //    timeout indicates that there is no herder request currently in 
the queue)
+        verify(member, times(2)).poll(eq(Long.MAX_VALUE));
+
+        // task reconfiguration herder request with initial retry backoff
+        verify(member).poll(eq(250L));
+
+        // task reconfiguration herder request with double the initial retry 
backoff
+        verify(member).poll(eq(500L));
+
+        verifyNoMoreInteractions(member, worker);
+    }
+
+    @Test
+    public void processRestartRequestsFailureSuppression() {
+        doNothing().when(member).wakeup();
+
+        final String connectorName = "foo";
+        RestartRequest restartRequest = new RestartRequest(connectorName, 
false, false);
+        doThrow(new 
RuntimeException()).when(herder).buildRestartPlan(restartRequest);
+
+        configUpdateListener.onRestartRequest(restartRequest);
+        assertEquals(1, herder.pendingRestartRequests.size());
+        herder.processRestartRequests();
+        assertTrue(herder.pendingRestartRequests.isEmpty());
+
+        verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
+    }
+
+    @Test
+    public void processRestartRequestsDequeue() {
+        doNothing().when(member).wakeup();
+        
doReturn(Optional.empty()).when(herder).buildRestartPlan(any(RestartRequest.class));
+
+        RestartRequest restartRequest = new RestartRequest("foo", false, 
false);
+        configUpdateListener.onRestartRequest(restartRequest);
+        restartRequest = new RestartRequest("bar", false, false);
+        configUpdateListener.onRestartRequest(restartRequest);
+        assertEquals(2, herder.pendingRestartRequests.size());
+        herder.processRestartRequests();
+        assertTrue(herder.pendingRestartRequests.isEmpty());
+    }
+
+    @Test
+    public void preserveHighestImpactRestartRequest() {
+        doNothing().when(member).wakeup();
+
+        final String connectorName = "foo";
+        RestartRequest restartRequest = new RestartRequest(connectorName, 
false, false);
+        configUpdateListener.onRestartRequest(restartRequest);
+
+        // Will overwrite as this is higher impact
+        restartRequest = new RestartRequest(connectorName, false, true);
+        configUpdateListener.onRestartRequest(restartRequest);
+        assertEquals(1, herder.pendingRestartRequests.size());
+        
assertFalse(herder.pendingRestartRequests.get(connectorName).onlyFailed());
+        
assertTrue(herder.pendingRestartRequests.get(connectorName).includeTasks());
+
+        // Will be ignored as the existing request has higher impact
+        restartRequest = new RestartRequest(connectorName, true, false);
+        configUpdateListener.onRestartRequest(restartRequest);
+        assertEquals(1, herder.pendingRestartRequests.size());
+        // Compare against existing request
+        
assertFalse(herder.pendingRestartRequests.get(connectorName).onlyFailed());
+        
assertTrue(herder.pendingRestartRequests.get(connectorName).includeTasks());
+
+        verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        
when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.SUPPORTED);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(Collections.emptyList(), errors);
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() 
{
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        
when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(
+                Collections.singletonList("The connector does not support 
exactly-once semantics with the provided configuration."),
+                errors);
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnknownConnector() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(null);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("The connector does not implement the 
API required for preflight validation of exactly-once source support."));
+        assertEquals(1, errors.size());
+    }
+
+    @Test
+    public void 
testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        String errorMessage = "time to add a new unit test :)";
+        when(connectorMock.exactlyOnceSupport(eq(config))).thenThrow(new 
NullPointerException(errorMessage));
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        PowerMock.verifyAll();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains(errorMessage));
+        assertEquals(1, errors.size());
     }
 
     @Test
-    public void 
testTaskReconfigurationRetriesWithConnectorTaskConfigsException() {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
-        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
-        expectConfigRefreshAndSnapshot(SNAPSHOT);
+    public void 
testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() {
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
 
-        // end of initial tick
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        
when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.SUPPORTED);
 
-        member.wakeup();
-        PowerMock.expectLastCall().anyTimes();
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        member.ensureActive();
-        PowerMock.expectLastCall().anyTimes();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertEquals(
+                Collections.singletonList("This worker does not have 
exactly-once source support enabled."),
+                errors);
+    }
 
-        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+    @Test
+    public void 
testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
"invalid");
 
-        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
-                .andThrow(new ConnectException("Failed to generate task 
configs")).times(2);
+        SourceConnector connectorMock = mock(SourceConnector.class);
 
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig)).andReturn(TASK_CONFIGS);
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        expectAndVerifyTaskReconfigurationRetries();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("String must be one of (case 
insensitive): "));
+        assertEquals(1, errors.size());
     }
 
     @Test
-    public void 
testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
-        herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"connectorType", "updateDeletedConnectorStatus", 
"updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", 
"recordRestarting"},
-                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, 
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
-                Collections.emptyList(), new MockSynchronousExecutor(), new 
AutoCloseable[]{});
+    public void testConnectorTransactionBoundaryValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
 
-        rebalanceListener = herder.new RebalanceListener(time);
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        when(connectorMock.canDefineTransactionBoundaries(eq(config)))
+                .thenReturn(ConnectorTransactionBoundaries.SUPPORTED);
 
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
-        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
-        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        // end of initial tick
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertEquals(Collections.emptyList(), errors);
+    }
 
-        member.wakeup();
-        PowerMock.expectLastCall().anyTimes();
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationOnUnsupportedConnector() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
 
-        member.ensureActive();
-        PowerMock.expectLastCall().anyTimes();
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        when(connectorMock.canDefineTransactionBoundaries(eq(config)))
+                .thenReturn(ConnectorTransactionBoundaries.UNSUPPORTED);
 
-        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("The connector does not support 
connector-defined transaction boundaries with the given configuration."));
+        assertEquals(1, errors.size());
+    }
 
-        List<Map<String, String>> changedTaskConfigs = new 
ArrayList<>(TASK_CONFIGS);
-        changedTaskConfigs.add(TASK_CONFIG);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig)).andReturn(changedTaskConfigs).anyTimes();
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
CONNECTOR.toString());
 
-        EasyMock.expect(restClient.httpRequest(
-                EasyMock.anyString(), EasyMock.eq("POST"), 
EasyMock.anyObject(HttpHeaders.class),
-                EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
-        ).andThrow(new ConnectException("Request to leader to reconfigure 
connector tasks failed")).times(2);
+        SourceConnector connectorMock = mock(SourceConnector.class);
+        String errorMessage = "Wait I thought we tested for this?";
+        
when(connectorMock.canDefineTransactionBoundaries(eq(config))).thenThrow(new 
ConnectException(errorMessage));
 
-        EasyMock.expect(restClient.httpRequest(
-                EasyMock.anyString(), EasyMock.eq("POST"), 
EasyMock.anyObject(HttpHeaders.class),
-                EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
-        ).andReturn(null);
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        expectAndVerifyTaskReconfigurationRetries();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains(errorMessage));
+        assertEquals(1, errors.size());
     }
 
-    private void expectAndVerifyTaskReconfigurationRetries() {
-        // task reconfiguration herder request with initial retry backoff
-        member.poll(EasyMock.eq(250L));
-        PowerMock.expectLastCall();
-
-        // task reconfiguration herder request with double the initial retry 
backoff
-        member.poll(EasyMock.eq(500L));
-        PowerMock.expectLastCall();
-
-        // the third task reconfiguration request is expected to pass; so 
expect no more retries (a Long.MAX_VALUE poll
-        // timeout indicates that there is no herder request currently in the 
queue)
-        member.poll(EasyMock.eq(Long.MAX_VALUE));
-        PowerMock.expectLastCall();
+    @Test
+    public void 
testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, 
"CONNECTOR.toString()");
 
-        PowerMock.replayAll();
+        SourceConnector connectorMock = mock(SourceConnector.class);
 
-        // initial tick
-        herder.tick();
-        herder.requestTaskReconfiguration(CONN1);
-        // process the task reconfiguration request in this tick
-        herder.tick();
-        // advance the time by 250ms so that the task reconfiguration request 
with initial retry backoff is processed
-        time.sleep(250);
-        herder.tick();
-        // advance the time by 500ms so that the task reconfiguration request 
with double the initial retry backoff is processed
-        time.sleep(500);
-        herder.tick();
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
 
-        PowerMock.verifyAll();
+        List<String> errors = 
validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages();
+        assertFalse(errors.isEmpty());
+        assertTrue(
+                "Error message did not contain expected text: " + 
errors.get(0),
+                errors.get(0).contains("String must be one of (case 
insensitive): "));
+        assertEquals(1, errors.size());
     }
 
     @Test
     public void testConnectorOffsets() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT);
-        expectAnyTicks();
 
-        member.wakeup();
-        PowerMock.expectLastCall();
+        
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+        doNothing().when(member).poll(anyLong());
 
-        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
+        herder.tick();
+
+        when(configBackingStore.snapshot()).thenReturn(SNAPSHOT);
         ConnectorOffsets offsets = new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(
                 Collections.singletonMap("partitionKey", "partitionValue"),
                 Collections.singletonMap("offsetKey", "offsetValue"))));
-        Capture<Callback<ConnectorOffsets>> callbackCapture = newCapture();
-        worker.connectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), 
capture(callbackCapture));
-        PowerMock.expectLastCall().andAnswer(() -> {
+
+        ArgumentCaptor<Callback<ConnectorOffsets>> callbackCapture = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             callbackCapture.getValue().onCompletion(null, offsets);
             return null;
-        });
+        }).when(worker).connectorOffsets(eq(CONN1), eq(CONN1_CONFIG), 
callbackCapture.capture());
 
-        PowerMock.replayAll();
-
-        herder.tick();
         FutureCallback<ConnectorOffsets> cb = new FutureCallback<>();
         herder.connectorOffsets(CONN1, cb);
         herder.tick();
         assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS));
 
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
     }
 
     @Test
     public void testModifyConnectorOffsetsUnknownConnector() throws Exception {
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // Now handle the connector offsets modification request
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        expectConfigRefreshAndSnapshot(SNAPSHOT);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.tick();
+
+        // Now handle the connector offsets modification request
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.modifyConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
         herder.tick();
         ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(e.getCause() instanceof NotFoundException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyOffsetsConnectorNotInStoppedState() throws Exception 
{
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // Now handle the connector offsets modification request
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        expectConfigRefreshAndSnapshot(SNAPSHOT);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.tick();
+
+        // Now handle the connector offsets modification request
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.modifyConnectorOffsets(CONN1, null, callback);
         herder.tick();
         ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(e.getCause() instanceof BadRequestException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyOffsetsNotLeader() throws Exception {
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("member");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
         expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // Now handle the connector offsets modification request
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.tick();
+
+        // Now handle the connector offsets modification request
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.modifyConnectorOffsets(CONN1, new HashMap<>(), callback);
         herder.tick();
         ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(e.getCause() instanceof NotLeaderException);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyOffsetsSinkConnector() throws Exception {
-        EasyMock.reset(herder);
-        
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SINK).anyTimes();
-        PowerMock.expectPrivate(herder, 
"updateDeletedConnectorStatus").andVoid().anyTimes();
-        PowerMock.expectPrivate(herder, 
"updateDeletedTaskStatus").andVoid().anyTimes();
-
+        when(herder.connectorType(any())).thenReturn(ConnectorType.SINK);
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
+
+        herder.tick();
 
         // Now handle the alter connector offsets request
         Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
                 Collections.singletonMap("partitionKey", "partitionValue"),
                 Collections.singletonMap("offsetKey", "offsetValue"));
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
-        worker.modifyConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+
+        ArgumentCaptor<Callback<Message>> workerCallbackCapture = 
ArgumentCaptor.forClass(Callback.class);
         Message msg = new Message("The offsets for this connector have been 
altered successfully");
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doAnswer(invocation -> {
             workerCallbackCapture.getValue().onCompletion(null, msg);
             return null;
-        });
+        }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), 
eq(offsets), workerCallbackCapture.capture());
 
-        PowerMock.replayAll();
-
-        herder.tick();
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.alterConnectorOffsets(CONN1, offsets, callback);
         herder.tick();
         assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
         assertEquals("The offsets for this connector have been altered 
successfully", msg.message());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
+        herder.tick();
 
         // Now handle the reset connector offsets request
-        member.wakeup();
-        PowerMock.expectLastCall();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
-        worker.modifyConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), isNull(), capture(workerCallbackCapture));
+        ArgumentCaptor<Callback<Message>> workerCallbackCapture = 
ArgumentCaptor.forClass(Callback.class);
         Message msg = new Message("The offsets for this connector have been 
reset successfully");
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doAnswer(invocation -> {
             workerCallbackCapture.getValue().onCompletion(null, msg);
             return null;
-        });
+        }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), 
isNull(), workerCallbackCapture.capture());
 
-        PowerMock.replayAll();
-
-        herder.tick();
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.resetConnectorOffsets(CONN1, callback);
         herder.tick();
         assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
         assertEquals("The offsets for this connector have been reset 
successfully", msg.message());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testModifyOffsetsSourceConnectorExactlyOnceEnabled() throws 
Exception {
         // Setup herder with exactly-once support for source connectors enabled
         herder = exactlyOnceHerder();
         rebalanceListener = herder.new RebalanceListener(time);
-        PowerMock.expectPrivate(herder, 
"updateDeletedConnectorStatus").andVoid().anyTimes();
-        PowerMock.expectPrivate(herder, 
"updateDeletedTaskStatus").andVoid().anyTimes();
-
         // Get the initial assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
         expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall().anyTimes();
+        doNothing().when(member).poll(anyLong());
+
+        herder.tick();
 
         // Now handle the alter connector offsets request
-        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
-                Collections.singletonMap("partitionKey", "partitionValue"),
-                Collections.singletonMap("offsetKey", "offsetValue"));
-        member.wakeup();
-        PowerMock.expectLastCall().anyTimes();
-        member.ensureActive();
-        PowerMock.expectLastCall().anyTimes();
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes();
+        doNothing().when(member).ensureActive();
+        when(herder.connectorType(any())).thenReturn(ConnectorType.SOURCE);
 
         // Expect a round of zombie fencing to occur
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
-        KafkaFuture<Void> workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
-        KafkaFuture<Void> herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
-        EasyMock.expect(worker.fenceZombies(CONN1, 
SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), 
CONN1_CONFIG)).andReturn(workerFencingFuture);
-        
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void,
 Void>>anyObject())).andReturn(herderFencingFuture);
-
-        // Two fencing callbacks are added - one is in ZombieFencing::start 
itself to remove the connector from the active
-        // fencing list. The other is the callback passed from 
DistributedHerder::modifyConnectorOffsets in order to
-        // queue up the actual alter offsets request if the zombie fencing 
succeeds.
-        for (int i = 0; i < 2; i++) {
-            Capture<KafkaFuture.BiConsumer<Void, Throwable>> 
herderFencingCallback = EasyMock.newCapture();
-            
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(()
 -> {
-                herderFencingCallback.getValue().accept(null, null);
-                return null;
-            });
-        }
-
-        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
-        Message msg = new Message("The offsets for this connector have been 
altered successfully");
-        worker.modifyConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
-        EasyMock.expectLastCall().andAnswer(() -> {
-            workerCallbackCapture.getValue().onCompletion(null, msg);
+        KafkaFuture<Void> workerFencingFuture = mock(KafkaFuture.class);
+        KafkaFuture<Void> herderFencingFuture = mock(KafkaFuture.class);
+        when(worker.fenceZombies(CONN1, 
SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), 
CONN1_CONFIG)).thenReturn(workerFencingFuture);
+        
when(workerFencingFuture.thenApply(any(KafkaFuture.BaseFunction.class))).thenReturn(herderFencingFuture);
+
+        ArgumentCaptor<KafkaFuture.BiConsumer<Void, Throwable>> 
herderFencingCallback = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
+        
when(herderFencingFuture.whenComplete(herderFencingCallback.capture())).thenAnswer(invocation
 -> {
+            herderFencingCallback.getValue().accept(null, null);
             return null;
         });
 
-        // Handle the second alter connector offsets request. No zombie 
fencing request to the worker is expected now since we
-        // already did a round of zombie fencing last time and no new tasks 
came up in the meanwhile. The config snapshot is
-        // refreshed once at the beginning of the 
DistributedHerder::modifyConnectorOffsets method, once before checking
-        // whether zombie fencing is required, and once before actually 
proceeding to alter connector offsets.
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
-        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
-        Capture<Callback<Message>> workerCallbackCapture2 = 
Capture.newInstance();
-        worker.modifyConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture2));
-        EasyMock.expectLastCall().andAnswer(() -> {
-            workerCallbackCapture2.getValue().onCompletion(null, msg);
-            return null;
-        });
+        ArgumentCaptor<Callback<Message>> workerCallbackCapture = 
ArgumentCaptor.forClass(Callback.class);
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
 
-        PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap("partitionKey", "partitionValue"),
+                Collections.singletonMap("offsetKey", "offsetValue"));
+        doAnswer(invocation -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), 
eq(offsets), workerCallbackCapture.capture());
 
-        herder.tick();
         FutureCallback<Message> callback = new FutureCallback<>();
         herder.alterConnectorOffsets(CONN1, offsets, callback);
-        // Process the zombie fencing request
-        herder.tick();
-        // Process the alter offsets request
+        // Process the zombie fencing request that is queued up first followed 
by the actual alter offsets request

Review Comment:
   Great, thanks for humoring me!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to