This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 76963c9 SAMZA-2503 : Adding retry when getting ssp metadata from
kafka (#1339)
76963c9 is described below
commit 76963c945add609cf558048a5c02f1041b87c410
Author: MabelYC <[email protected]>
AuthorDate: Tue Apr 7 23:11:05 2020 -0700
SAMZA-2503 : Adding retry when getting ssp metadata from kafka (#1339)
---
.../samza/system/kafka/KafkaSystemAdmin.java | 74 +++++++++++++++++++---
.../system/kafka/TestKafkaSystemAdminWithMock.java | 43 +++++++++++++
2 files changed, 107 insertions(+), 10 deletions(-)
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index ecb95a9..91d4b11 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -298,24 +298,78 @@ public class KafkaSystemAdmin implements SystemAdmin {
@Override
public Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Set<SystemStreamPartition> ssps) {
+ return getSSPMetadata(ssps,
+ new
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+ DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS,
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
+ }
+
+ /**
+ * Given a set of SystemStreamPartition, fetch metadata from Kafka for each
+ * of them, and return a map from ssp to SystemStreamPartitionMetadata for
+ * each of them. This method will return null for oldest and newest offsets
+ * if a given SystemStreamPartition is empty. This method will block and
+ * retry indefinitely until it gets a successful response from Kafka.
+ * @param ssps a set of strings of SSP
+ * @param retryBackoff retry backoff strategy
+ * @return a map from ssp to sspMetadata which has offsets
+ */
+ Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
+ Set<SystemStreamPartition> ssps, ExponentialSleepStrategy retryBackoff) {
LOG.info("Fetching SSP metadata for: {}", ssps);
List<TopicPartition> topicPartitions = ssps.stream()
.map(ssp -> new TopicPartition(ssp.getStream(),
ssp.getPartition().getPartitionId()))
.collect(Collectors.toList());
- OffsetsMaps topicPartitionsMetadata =
fetchTopicPartitionsMetadata(topicPartitions);
+ Function1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
+ SystemStreamMetadata.SystemStreamPartitionMetadata>>
fetchTopicPartitionMetadataOperation =
+ new AbstractFunction1<ExponentialSleepStrategy.RetryLoop,
Map<SystemStreamPartition,
+ SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
- Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new
HashMap<>();
- for (SystemStreamPartition ssp : ssps) {
- String oldestOffset =
topicPartitionsMetadata.getOldestOffsets().get(ssp);
- String newestOffset =
topicPartitionsMetadata.getNewestOffsets().get(ssp);
- String upcomingOffset =
topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+ @Override
+ public Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
+ ExponentialSleepStrategy.RetryLoop loop) {
+ OffsetsMaps topicPartitionsMetadata =
fetchTopicPartitionsMetadata(topicPartitions);
+
+ Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new
HashMap<>();
+ for (SystemStreamPartition ssp : ssps) {
+ String oldestOffset =
topicPartitionsMetadata.getOldestOffsets().get(ssp);
+ String newestOffset =
topicPartitionsMetadata.getNewestOffsets().get(ssp);
+ String upcomingOffset =
topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+
+ sspToSSPMetadata.put(ssp,
+ new
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset,
upcomingOffset));
+ }
+ loop.done();
+ return sspToSSPMetadata;
+ }
+ };
- sspToSSPMetadata.put(ssp,
- new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset,
newestOffset, upcomingOffset));
- }
- return sspToSSPMetadata;
+ Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>
onExceptionRetryOperation =
+ new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop,
BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Exception exception,
ExponentialSleepStrategy.RetryLoop loop) {
+ if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+ LOG.warn(
+ String.format("Fetching SSP metadata for: %s threw an
exception. Retrying.", ssps), exception);
+ } else {
+ LOG.error(String.format("Fetching SSP metadata for: %s threw an
exception.", ssps), exception);
+ loop.done();
+ throw new SamzaException(exception);
+ }
+ return null;
+ }
+ };
+
+ Function0<Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata>> fallbackOperation =
+ new AbstractFunction0<Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+ @Override
+ public Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
+ throw new SamzaException("Failed to get SSP metadata");
+ }
+ };
+
+ return retryBackoff.run(fetchTopicPartitionMetadataOperation,
onExceptionRetryOperation).getOrElse(fallbackOperation);
}
/**
diff --git
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
index c10f152..25cab8c 100644
---
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
+++
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +43,7 @@ import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.ExponentialSleepStrategy;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -308,4 +310,45 @@ public class TestKafkaSystemAdminWithMock {
kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL);
}
+
+ @Test
+ public void testGetSSPMetadataWithRetry() {
+ SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM,
VALID_TOPIC, new Partition(0));
+ SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM,
"otherTopic", new Partition(1));
+ ImmutableSet<SystemStreamPartition> ssps = ImmutableSet.of(oneSSP,
otherSSP);
+ List<TopicPartition> topicPartitions = ssps.stream()
+ .map(ssp -> new TopicPartition(ssp.getStream(),
ssp.getPartition().getPartitionId()))
+ .collect(Collectors.toList());
+ Map<TopicPartition, Long> testBeginningOffsets =
+ ImmutableMap.of(testTopicPartition0,
KAFKA_BEGINNING_OFFSET_FOR_PARTITION0, testTopicPartition1,
+ KAFKA_BEGINNING_OFFSET_FOR_PARTITION1);
+
+ when(mockKafkaConsumer.beginningOffsets(topicPartitions)).thenThrow(new
RuntimeException())
+ .thenReturn(testBeginningOffsets);
+ Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> sspMetadata =
+ kafkaSystemAdmin.getSSPMetadata(ssps, new ExponentialSleepStrategy(2,
+ 1, 1));
+
+ assertEquals("metadata should return for 2 topics", sspMetadata.size(), 2);
+
+ // retried twice because the first fails and the second succeeds
+ Mockito.verify(mockKafkaConsumer,
Mockito.times(2)).beginningOffsets(topicPartitions);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException()
throws Exception{
+ SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM,
VALID_TOPIC, new Partition(0));
+ SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM,
"otherTopic", new Partition(1));
+
+ ImmutableSet<SystemStreamPartition> ssps = ImmutableSet.of(oneSSP,
otherSSP);
+ List<TopicPartition> topicPartitions = ssps.stream()
+ .map(ssp -> new TopicPartition(ssp.getStream(),
ssp.getPartition().getPartitionId()))
+ .collect(Collectors.toList());
+
+ when(mockKafkaConsumer.beginningOffsets(topicPartitions)).thenThrow(new
RuntimeException())
+ .thenThrow(new RuntimeException());
+
+ kafkaSystemAdmin.getSSPMetadata(ssps, new ExponentialSleepStrategy(2,
+ 1, 1));
+ }
}