This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 76963c9  SAMZA-2503 : Adding retry when getting ssp metadata from 
kafka (#1339)
76963c9 is described below

commit 76963c945add609cf558048a5c02f1041b87c410
Author: MabelYC <[email protected]>
AuthorDate: Tue Apr 7 23:11:05 2020 -0700

    SAMZA-2503 : Adding retry when getting ssp metadata from kafka (#1339)
---
 .../samza/system/kafka/KafkaSystemAdmin.java       | 74 +++++++++++++++++++---
 .../system/kafka/TestKafkaSystemAdminWithMock.java | 43 +++++++++++++
 2 files changed, 107 insertions(+), 10 deletions(-)

diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index ecb95a9..91d4b11 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -298,24 +298,78 @@ public class KafkaSystemAdmin implements SystemAdmin {
   @Override
   public Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
       Set<SystemStreamPartition> ssps) {
+    return getSSPMetadata(ssps,
+        new 
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+            DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, 
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
+  }
+
+  /**
+   * Given a set of SystemStreamPartition, fetch metadata from Kafka for each
+   * of them, and return a map from ssp to SystemStreamPartitionMetadata for
+   * each of them. This method will return null for oldest and newest offsets
+   * if a given SystemStreamPartition is empty. This method will block and
+   * retry indefinitely until it gets a successful response from Kafka.
+   * @param ssps a set of strings of SSP
+   * @param retryBackoff retry backoff strategy
+   * @return a map from ssp to sspMetadata which has offsets
+   */
+  Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
+      Set<SystemStreamPartition> ssps, ExponentialSleepStrategy retryBackoff) {
 
     LOG.info("Fetching SSP metadata for: {}", ssps);
     List<TopicPartition> topicPartitions = ssps.stream()
         .map(ssp -> new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId()))
         .collect(Collectors.toList());
 
-    OffsetsMaps topicPartitionsMetadata = 
fetchTopicPartitionsMetadata(topicPartitions);
+    Function1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
+        SystemStreamMetadata.SystemStreamPartitionMetadata>> 
fetchTopicPartitionMetadataOperation =
+        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, 
Map<SystemStreamPartition,
+            SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
 
-    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new 
HashMap<>();
-    for (SystemStreamPartition ssp : ssps) {
-      String oldestOffset = 
topicPartitionsMetadata.getOldestOffsets().get(ssp);
-      String newestOffset = 
topicPartitionsMetadata.getNewestOffsets().get(ssp);
-      String upcomingOffset = 
topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+          @Override
+          public Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
+              ExponentialSleepStrategy.RetryLoop loop) {
+            OffsetsMaps topicPartitionsMetadata = 
fetchTopicPartitionsMetadata(topicPartitions);
+
+            Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new 
HashMap<>();
+            for (SystemStreamPartition ssp : ssps) {
+              String oldestOffset = 
topicPartitionsMetadata.getOldestOffsets().get(ssp);
+              String newestOffset = 
topicPartitionsMetadata.getNewestOffsets().get(ssp);
+              String upcomingOffset = 
topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+
+              sspToSSPMetadata.put(ssp,
+                  new 
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, 
upcomingOffset));
+            }
+            loop.done();
+            return sspToSSPMetadata;
+          }
+        };
 
-      sspToSSPMetadata.put(ssp,
-          new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, 
newestOffset, upcomingOffset));
-    }
-    return sspToSSPMetadata;
+    Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> 
onExceptionRetryOperation =
+        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, 
BoxedUnit>() {
+          @Override
+          public BoxedUnit apply(Exception exception, 
ExponentialSleepStrategy.RetryLoop loop) {
+            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+              LOG.warn(
+                  String.format("Fetching SSP metadata for: %s threw an 
exception. Retrying.", ssps), exception);
+            } else {
+              LOG.error(String.format("Fetching SSP metadata for: %s threw an 
exception.", ssps), exception);
+              loop.done();
+              throw new SamzaException(exception);
+            }
+            return null;
+          }
+        };
+
+    Function0<Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata>> fallbackOperation =
+        new AbstractFunction0<Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+          @Override
+          public Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
+            throw new SamzaException("Failed to get SSP metadata");
+          }
+        };
+
+    return retryBackoff.run(fetchTopicPartitionMetadataOperation, 
onExceptionRetryOperation).getOrElse(fallbackOperation);
   }
 
   /**
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
index c10f152..25cab8c 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +43,7 @@ import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.ExponentialSleepStrategy;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -308,4 +310,45 @@ public class TestKafkaSystemAdminWithMock {
 
     kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL);
   }
+
+  @Test
+  public void testGetSSPMetadataWithRetry() {
+    SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, 
"otherTopic", new Partition(1));
+    ImmutableSet<SystemStreamPartition> ssps = ImmutableSet.of(oneSSP, 
otherSSP);
+    List<TopicPartition> topicPartitions = ssps.stream()
+        .map(ssp -> new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId()))
+        .collect(Collectors.toList());
+    Map<TopicPartition, Long> testBeginningOffsets =
+        ImmutableMap.of(testTopicPartition0, 
KAFKA_BEGINNING_OFFSET_FOR_PARTITION0, testTopicPartition1,
+            KAFKA_BEGINNING_OFFSET_FOR_PARTITION1);
+
+    when(mockKafkaConsumer.beginningOffsets(topicPartitions)).thenThrow(new 
RuntimeException())
+        .thenReturn(testBeginningOffsets);
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> sspMetadata =
+        kafkaSystemAdmin.getSSPMetadata(ssps, new ExponentialSleepStrategy(2,
+            1, 1));
+
+    assertEquals("metadata should return for 2 topics", sspMetadata.size(), 2);
+
+    // retried twice because the first fails and the second succeeds
+    Mockito.verify(mockKafkaConsumer, 
Mockito.times(2)).beginningOffsets(topicPartitions);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException() 
throws Exception{
+    SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, 
"otherTopic", new Partition(1));
+
+    ImmutableSet<SystemStreamPartition> ssps = ImmutableSet.of(oneSSP, 
otherSSP);
+    List<TopicPartition> topicPartitions = ssps.stream()
+        .map(ssp -> new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId()))
+        .collect(Collectors.toList());
+
+    when(mockKafkaConsumer.beginningOffsets(topicPartitions)).thenThrow(new 
RuntimeException())
+        .thenThrow(new RuntimeException());
+
+    kafkaSystemAdmin.getSSPMetadata(ssps, new ExponentialSleepStrategy(2,
+        1, 1));
+  }
 }

Reply via email to