[ 
https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361546#comment-16361546
 ] 

ASF GitHub Bot commented on KAFKA-5944:
---------------------------------------

hachikuji closed pull request #3965: KAFKA-5944: Unit tests for handling SASL 
authentication failures in clients
URL: https://github.com/apache/kafka/pull/3965
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d843414fd7a..65255fe9648 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ public FutureResponse(Node node,
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
+    private final Map<Node, AuthenticationException> authenticationException = 
new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from 
a different thread
     private final Queue<ClientRequest> requests = new 
ConcurrentLinkedDeque<>();
     // Use concurrent queue for responses so that responses may be updated 
during poll() from a different thread.
@@ -102,7 +104,7 @@ public boolean isReady(Node node, long now) {
 
     @Override
     public boolean ready(Node node, long now) {
-        if (isBlackedOut(node))
+        if (isBlackedOut(node) || authenticationException(node) != null)
             return false;
         ready.add(node.idString());
         return true;
@@ -117,6 +119,12 @@ public void blackout(Node node, long duration) {
         blackedOut.put(node, time.milliseconds() + duration);
     }
 
+    public void authenticationFailed(Node node, long duration) {
+        authenticationException.put(node, (AuthenticationException) 
Errors.SASL_AUTHENTICATION_FAILED.exception());
+        disconnect(node.idString());
+        blackout(node, duration);
+    }
+
     private boolean isBlackedOut(Node node) {
         if (blackedOut.containsKey(node)) {
             long expiration = blackedOut.get(node);
@@ -137,7 +145,7 @@ public boolean connectionFailed(Node node) {
 
     @Override
     public AuthenticationException authenticationException(Node node) {
-        return null;
+        return authenticationException.get(node);
     }
 
     @Override
@@ -347,6 +355,7 @@ public void reset() {
         responses.clear();
         futureResponses.clear();
         metadataUpdates.clear();
+        authenticationException.clear();
     }
 
     public void prepareMetadataUpdate(Cluster cluster, Set<String> 
unavailableTopics) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 186ccf06cb5..f08a99b6ddc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -248,6 +249,75 @@ public void testInvalidTopicNames() throws Exception {
         }
     }
 
+    @Test
+    public void 
testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws 
Exception {
+        AdminClientUnitTestEnv env = mockClientEnv();
+        Node node = env.cluster().controller();
+        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+        env.kafkaClient().setNode(node);
+        env.kafkaClient().authenticationFailed(node, 300);
+
+        callAdminClientApisAndExpectAnAuthenticationError(env);
+
+        // wait less than the blackout period, the connection should fail and 
the authentication error should remain
+        env.time().sleep(30);
+        assertTrue(env.kafkaClient().connectionFailed(node));
+        callAdminClientApisAndExpectAnAuthenticationError(env);
+
+        env.close();
+    }
+
+    private void 
callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) 
throws InterruptedException {
+        env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+
+        try {
+            env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+
+        try {
+            Map<String, NewPartitions> counts = new HashMap<>();
+            counts.put("my_topic", NewPartitions.increaseTo(3));
+            counts.put("other_topic", NewPartitions.increaseTo(3, 
asList(asList(2), asList(3))));
+            env.adminClient().createPartitions(counts).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().describeAcls(FILTER1).values().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().describeConfigs(Collections.singleton(new 
ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+        }
+    }
+
     private static final AclBinding ACL1 = new AclBinding(new 
Resource(ResourceType.TOPIC, "mytopic3"),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW));
     private static final AclBinding ACL2 = new AclBinding(new 
Resource(ResourceType.TOPIC, "mytopic4"),
@@ -579,7 +649,7 @@ public static KafkaAdminClient 
createInternal(AdminClientConfig config, KafkaAdm
         private int numTries = 0;
 
         private int failuresInjected = 0;
-        
+
         @Override
         public KafkaAdminClient.TimeoutProcessor create(long now) {
             return new FailureInjectingTimeoutProcessor(now);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d47124f8f3e..be8db2bf55c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -1431,6 +1432,88 @@ public boolean conditionMet() {
         }
     }
 
+    @Test
+    public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        client.authenticationFailed(node, 300);
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+
+        consumer.subscribe(Collections.singleton(topic));
+        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+        client.requests().clear();
+        consumer.close(0, TimeUnit.MILLISECONDS);
+    }
+
+    private void 
callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer<?, ?> consumer, 
TopicPartition partition) {
+        try {
+            consumer.partitionsFor("some other topic");
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.beginningOffsets(Collections.singleton(partition));
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.endOffsets(Collections.singleton(partition));
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.poll(10);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+        offset.put(partition, new OffsetAndMetadata(10L));
+
+        try {
+            consumer.commitSync(offset);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.committed(partition);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final 
KafkaConsumer<String, String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 7eaca9826d9..1c88803e26c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -519,6 +520,30 @@ public void testWakeupInOnJoinComplete() throws Exception {
         awaitFirstHeartbeat(heartbeatReceived);
     }
 
+    @Test
+    public void 
testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
+        mockClient.authenticationFailed(node, 300);
+
+        try {
+            coordinator.ensureCoordinatorReady();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        mockTime.sleep(30); // wait less than the blackout period
+        assertTrue(mockClient.connectionFailed(node));
+
+        try {
+            coordinator.ensureCoordinatorReady();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
     private AtomicBoolean prepareFirstHeartbeat() {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c49339b6525..fdaa6b3f522 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -33,6 +33,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -1503,6 +1504,28 @@ public void testRefreshOffsetWithNoFetchableOffsets() {
         assertEquals(null, subscriptions.committed(t1p));
     }
 
+    @Test
+    public void 
testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() {
+        client.authenticationFailed(node, 300);
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
     @Test
     public void testProtocolMetadataOrder() {
         RoundRobinAssignor roundRobin = new RoundRobinAssignor();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 93c6acd9c40..904270ec489 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
@@ -69,6 +70,24 @@ public void send() {
         assertEquals(Errors.NONE, response.error());
     }
 
+    @Test
+    public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws 
InterruptedException {
+        client.authenticationFailed(node, 300);
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
+        consumerClient.poll(future);
+        assertTrue(future.failed());
+        assertTrue("Expected only an authentication error.", 
future.exception() instanceof AuthenticationException);
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+
+        final RequestFuture<ClientResponse> future2 = 
consumerClient.send(node, heartbeat());
+        consumerClient.poll(future2);
+        assertTrue(future2.failed());
+        assertTrue("Expected only an authentication error.", 
future2.exception() instanceof AuthenticationException);
+    }
+
     @Test
     public void multiSend() {
         client.prepareResponse(heartbeatResponse(Errors.NONE));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add unit tests for handling of authentication failures in clients
> -----------------------------------------------------------------
>
>                 Key: KAFKA-5944
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5944
>             Project: Kafka
>          Issue Type: Test
>          Components: clients
>            Reporter: Rajini Sivaram
>            Assignee: Vahid Hashemian
>            Priority: Major
>             Fix For: 2.0.0
>
>
> KAFKA-5854 improves authentication failures in clients and has added 
> integration tests and some basic client-side tests that create actual 
> connections to a mock server. It will be good to add a set of tests for 
> producers, consumers etc. that use MockClient to add more extensive tests for 
> various scenarios.
> cc [~hachikuji] [~vahid]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to