This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 42a7c62  KAFKA-12343: Handle exceptions better in TopicAdmin, 
including UnsupportedVersionException (#10158)
42a7c62 is described below

commit 42a7c625b281847fb51926fa4b01a09398225264
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Fri Feb 19 14:43:32 2021 -0600

    KAFKA-12343: Handle exceptions better in TopicAdmin, including 
UnsupportedVersionException (#10158)
    
    Refactored the KafkaBasedLog logic to read end offsets into a separate 
method to make it easier to test. Also changed the TopicAdmin.endOffsets method 
to throw the original UnsupportedVersionException, LeaderNotAvailableException, 
and TimeoutException rather than wrapping, to better conform with the consumer 
method and how the KafkaBasedLog retries those exceptions.
    
    Added new tests to verify various scenarios and errors.
    
    Author: Randall Hauch <rha...@gmail.com>
    Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Chia-Ping 
Tsai <chia7...@gmail.com>
---
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 65 +++++++++-------
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 16 ++--
 .../kafka/connect/util/KafkaBasedLogTest.java      | 89 +++++++++++++++++++++-
 .../apache/kafka/connect/util/TopicAdminTest.java  | 10 +--
 4 files changed, 139 insertions(+), 41 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6a2a787..6e2350f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -28,7 +28,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -318,32 +320,8 @@ public class KafkaBasedLog<K, V> {
     }
 
     private void readToLogEnd() {
-        log.trace("Reading to end of offset log");
-
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets;
-        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
-        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
-        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
-        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
-        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
-        // (if available)
-        // (which prevents 'consumer.endOffsets(...)'
-        // from
-
-        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
-        if (admin != null) {
-            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
-            // Unlike using the consumer
-            endOffsets = admin.endOffsets(assignment);
-        } else {
-            // The admin may be null if older deprecated constructor is used, 
though AK Connect currently always provides an admin client.
-            // Using the consumer is not ideal, because when the topic has low 
volume, the 'poll(...)' method called from the
-            // work thread may have blocked the consumer while waiting for 
more records (even when there are none).
-            // In such cases, this call to the consumer to simply find the end 
offsets will block even though we might already be
-            // at the end offset.
-            endOffsets = consumer.endOffsets(assignment);
-        }
+        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
@@ -366,6 +344,37 @@ public class KafkaBasedLog<K, V> {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available) to obtain the end offsets for the given topic 
partitions.
+
+        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+        if (admin != null) {
+            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+            // Unlike using the consumer
+            try {
+                return admin.endOffsets(assignment);
+            } catch (UnsupportedVersionException e) {
+                // This may happen with really old brokers that don't support 
the auto topic creation
+                // field in metadata requests
+                log.debug("Reading to end of log offsets with consumer since 
admin client is unsupported: {}", e.getMessage());
+                // Forget the reference to the admin so that we won't even try 
to use the admin the next time this method is called
+                admin = null;
+                // continue and let the consumer handle the read
+            }
+            // Other errors, like timeouts and retriable exceptions are 
intentionally propagated
+        }
+        // The admin may be null if older deprecated constructor is used or if 
the admin client is using a broker that doesn't
+        // support getting the end offsets (e.g., 0.10.x). In such cases, we 
should use the consumer, which is not ideal (see above).
+        return consumer.endOffsets(assignment);
+    }
 
     private class WorkThread extends Thread {
         public WorkThread() {
@@ -390,7 +399,11 @@ public class KafkaBasedLog<K, V> {
                             log.trace("Finished read to end log for topic {}", 
topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-                                "This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+                                     "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                            continue;
+                        } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                            log.warn("Retriable error while reading log to end 
for topic '{}'. Retrying automatically. " +
+                                     "Reason: {}", topic, e.getMessage());
                             continue;
                         } catch (WakeupException e) {
                             // Either received another get() call and need to 
retry reading to end of log or stop() was
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 8b7e456..07cd1e5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -651,8 +651,12 @@ public class TopicAdmin implements AutoCloseable {
      * @param partitions the topic partitions
      * @return the map of offset for each topic partition, or an empty map if 
the supplied partitions
      *         are null or empty
-     * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the
-     *         thread is interrupted while attempting to perform this operation
+     * @throws UnsupportedVersionException if the admin client cannot read end 
offsets
+     * @throws TimeoutException if the offset metadata could not be fetched 
before the amount of time allocated
+     *         by {@code request.timeout.ms} expires, and this call can be 
retried
+     * @throws LeaderNotAvailableException if the leader was not available and 
this call can be retried
+     * @throws RetriableException if a retriable error occurs, or the thread 
is interrupted while attempting 
+     *         to perform this operation
      * @throws ConnectException if a non retriable error occurs
      */
     public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> 
partitions) {
@@ -677,13 +681,15 @@ public class TopicAdmin implements AutoCloseable {
                     // Should theoretically never happen, because this method 
is the same as what the consumer uses and therefore
                     // should exist in the broker since before the admin 
client was added
                     String msg = String.format("API to get the get the end 
offsets for topic '%s' is unsupported on brokers at %s", topic, 
bootstrapServers());
-                    throw new ConnectException(msg, e);
+                    throw new UnsupportedVersionException(msg, e);
                 } else if (cause instanceof TimeoutException) {
                     String msg = String.format("Timed out while waiting to get 
end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new RetriableException(msg, e);
+                    throw new TimeoutException(msg, e);
                 } else if (cause instanceof LeaderNotAvailableException) {
                     String msg = String.format("Unable to get end offsets 
during leader election for topic '%s' on brokers at %s", topic, 
bootstrapServers());
-                    throw new RetriableException(msg, e);
+                    throw new LeaderNotAvailableException(msg, e);
+                } else if (cause instanceof 
org.apache.kafka.common.errors.RetriableException) {
+                    throw (org.apache.kafka.common.errors.RetriableException) 
cause;
                 } else {
                     String msg = String.format("Error while getting end 
offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
                     throw new ConnectException(msg, e);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 080f943..a1a78f1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
@@ -61,11 +62,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -117,6 +120,8 @@ public class KafkaBasedLogTest {
     @Mock
     private KafkaProducer<String, String> producer;
     private MockConsumer<String, String> consumer;
+    @Mock
+    private TopicAdmin admin;
 
     private Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
     private Callback<ConsumerRecord<String, String>> consumedCallback = new 
Callback<ConsumerRecord<String, String>>() {
@@ -536,15 +541,91 @@ public class KafkaBasedLogTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testReadEndOffsetsUsingAdmin() throws Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andReturn(endOffsets).times(2);
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertEquals(endOffsets, store.readEndOffsets(tps));
+    }
+
+    @Test
+    public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws 
Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        // Getting end offsets using the admin client should fail with 
unsupported version
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andThrow(new 
UnsupportedVersionException("too old"));
+
+        // Falls back to the consumer
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertEquals(endOffsets, store.readEndOffsets(tps));
+    }
+
+    @Test
+    public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws 
Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        // Getting end offsets upon startup should work fine
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+        // Getting end offsets using the admin client should fail with leader 
not available
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andThrow(new 
LeaderNotAvailableException("retry"));
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps));
+    }
+
+    @SuppressWarnings("unchecked")
+    private void setupWithAdmin() {
+        Supplier<TopicAdmin> adminSupplier = () -> admin;
+        java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
+        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+    }
+
+    private void expectProducerAndConsumerCreate() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                 .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                 .andReturn(consumer);
+    }
 
     private void expectStart() throws Exception {
         initializer.run();
         EasyMock.expectLastCall().times(1);
 
-        PowerMock.expectPrivate(store, "createProducer")
-                .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                .andReturn(consumer);
+        expectProducerAndConsumerCreate();
     }
 
     private void expectStop() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index b245be2..f874a4d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -521,7 +521,7 @@ public class TopicAdminTest {
     }
 
     @Test
-    public void 
endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() throws 
Exception {
+    public void 
endOffsetsShouldFailWithUnsupportedVersionWhenVersionUnsupportedErrorOccurs() 
throws Exception {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
         Set<TopicPartition> tps = Collections.singleton(tp1);
@@ -531,14 +531,13 @@ public class TopicAdminTest {
 
         // Then the topic admin should throw exception
         TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
+        UnsupportedVersionException e = 
assertThrows(UnsupportedVersionException.class, () -> {
             admin.endOffsets(tps);
         });
-        assertTrue(e.getMessage().contains("is unsupported on brokers"));
     }
 
     @Test
-    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() 
throws Exception {
+    public void 
endOffsetsShouldFailWithTimeoutExceptionWhenTimeoutErrorOccurs() throws 
Exception {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
         Set<TopicPartition> tps = Collections.singleton(tp1);
@@ -548,10 +547,9 @@ public class TopicAdminTest {
 
         // Then the topic admin should throw exception
         TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
+        TimeoutException e = assertThrows(TimeoutException.class, () -> {
             admin.endOffsets(tps);
         });
-        assertTrue(e.getMessage().contains("Timed out while waiting"));
     }
 
     @Test

Reply via email to