This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 718f41f411 Don't throw exception if partition count can't be fetched
(#9249)
718f41f411 is described below
commit 718f41f4110998e7cb6bc60ad449711c722568c2
Author: Sajjad Moradi <[email protected]>
AuthorDate: Sat Aug 20 09:42:02 2022 -0700
Don't throw exception if partition count can't be fetched (#9249)
---
.../realtime/RealtimeConsumptionRateManager.java | 49 +++++++++++++++++----
.../RealtimeConsumptionRateManagerTest.java | 51 ++++++++++++++++++++--
2 files changed, 88 insertions(+), 12 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index fb9fb410f1..8a4dc9b40c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -55,7 +57,8 @@ public class RealtimeConsumptionRateManager {
}
private static class InstanceHolder {
- private static final RealtimeConsumptionRateManager INSTANCE = new
RealtimeConsumptionRateManager(buildCache());
+ private static final RealtimeConsumptionRateManager INSTANCE = new
RealtimeConsumptionRateManager(
+ buildCache(DEFAULT_PARTITION_COUNT_FETCHER,
CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES, TimeUnit.MINUTES));
}
public static RealtimeConsumptionRateManager getInstance() {
@@ -85,17 +88,27 @@ public class RealtimeConsumptionRateManager {
return new RateLimiterImpl(partitionRateLimit);
}
- private static LoadingCache<StreamConfig, Integer> buildCache() {
- return
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES,
TimeUnit.MINUTES)
+ @VisibleForTesting
+ static LoadingCache<StreamConfig, Integer> buildCache(PartitionCountFetcher
partitionCountFetcher,
+ long duration, TimeUnit unit) {
+ return CacheBuilder.newBuilder().refreshAfterWrite(duration, unit)
.build(new CacheLoader<StreamConfig, Integer>() {
@Override
- public Integer load(StreamConfig streamConfig)
+ public Integer load(StreamConfig key)
throws Exception {
- String clientId = streamConfig.getTopicName() +
"-consumption.rate.manager";
- StreamConsumerFactory factory =
StreamConsumerFactoryProvider.create(streamConfig);
- try (StreamMetadataProvider streamMetadataProvider =
factory.createStreamMetadataProvider(clientId)) {
- return
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
- }
+ // this method is called the first time cache is used for the
given streamConfig
+ Integer count = partitionCountFetcher.fetch(key);
+ // if the count cannot be fetched, don't throw exception; return 1.
+ // The overall consumption rate will be higher, but we prefer that
over not consuming at all.
+ return count != null ? count : 1;
+ }
+
+ @Override
+ public ListenableFuture<Integer> reload(StreamConfig key, Integer
oldValue)
+ throws Exception {
+ // if partition count fetcher cannot fetch the value, old value is
returned
+ Integer count = partitionCountFetcher.fetch(key);
+ return Futures.immediateFuture(count != null ? count : oldValue);
}
});
}
@@ -131,4 +144,22 @@ public class RealtimeConsumptionRateManager {
return _rate;
}
}
+
+ @VisibleForTesting
+ @FunctionalInterface
+ interface PartitionCountFetcher {
+ Integer fetch(StreamConfig streamConfig);
+ }
+
+ @VisibleForTesting
+ static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER =
streamConfig -> {
+ String clientId = streamConfig.getTopicName() +
"-consumption.rate.manager";
+ StreamConsumerFactory factory =
StreamConsumerFactoryProvider.create(streamConfig);
+ try (StreamMetadataProvider streamMetadataProvider =
factory.createStreamMetadataProvider(clientId)) {
+ return
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
+ } catch (Exception e) {
+ LOGGER.warn("Error fetching metadata for topic " +
streamConfig.getTopicName(), e);
+ return null;
+ }
+ };
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 47eddc5d1a..cb795f7213 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -21,13 +21,14 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.cache.LoadingCache;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.stream.StreamConfig;
import org.testng.annotations.Test;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl;
+import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -71,4 +72,48 @@ public class RealtimeConsumptionRateManagerTest {
rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C,
TABLE_NAME);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
}
+
+ @Test
+ public void testBuildCache() throws Exception {
+ PartitionCountFetcher partitionCountFetcher =
mock(PartitionCountFetcher.class);
+ LoadingCache<StreamConfig, Integer> cache =
buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS);
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10);
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(20);
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // call fetcher in
load method
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // call fetcher in
load method
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache
+ verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_A); // count
changes
+ verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_B); // count
changes
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(11);
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(21);
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 10); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 20); // use cache
+ Thread.sleep(550); // wait till cache expires
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in
reload method
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // call fetcher in
reload method
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 21); // use cache
+ verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_A);
+ verify(partitionCountFetcher, times(2)).fetch(STREAM_CONFIG_B);
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(null); //
unsuccessful fetch
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(22);
+ Thread.sleep(550); // wait till cache expires
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // call fetcher in
reload method
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_A), 11); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // call fetcher in
reload method
+ assertEquals((int) cache.get(STREAM_CONFIG_B), 22); // use cache
+ verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_A);
+ verify(partitionCountFetcher, times(3)).fetch(STREAM_CONFIG_B);
+
+ // unsuccessful fetch in the first call for config C
+ when(partitionCountFetcher.fetch(STREAM_CONFIG_C)).thenReturn(null); //
unsuccessful fetch
+ assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // call fetcher in load
method
+ assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
+ assertEquals((int) cache.get(STREAM_CONFIG_C), 1); // use cache
+ verify(partitionCountFetcher, times(1)).fetch(STREAM_CONFIG_C);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]