MabelYC commented on a change in pull request #1339: SAMZA-2503 : Adding retry
when getting ssp metadata from kafka
URL: https://github.com/apache/samza/pull/1339#discussion_r405171434
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
##########
@@ -298,24 +298,78 @@ public BoxedUnit apply(Exception exception,
ExponentialSleepStrategy.RetryLoop l
@Override
public Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Set<SystemStreamPartition> ssps) {
+ return getSSPMetadata(ssps,
+ new
ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
+ DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS,
DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
+ }
+
+ /**
+ * Given a set of SystemStreamPartition, fetch metadata from Kafka for each
+ * of them, and return a map from ssp to SystemStreamPartitionMetadata for
+ * each of them. This method will return null for oldest and newest offsets
+ * if a given SystemStreamPartition is empty. This method will block and
+ * retry indefinitely until it gets a successful response from Kafka.
+ * @param ssps a set of strings of SSP
+ * @param retryBackoff retry backoff strategy
+ * @return a map from ssp to sspMetadata which has offsets
+ */
+ public Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Review comment:
Modified it to a private one and added unite test to test retry.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services