lukecwik commented on a change in pull request #16495:
URL: https://github.com/apache/beam/pull/16495#discussion_r785134812
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -57,92 +65,58 @@ public static long weigh(Object o) {
/** An eviction listener that reduces the size of entries that are {@link
Shrinkable}. */
@VisibleForTesting
- static class ShrinkOnEviction implements RemovalListener<CompositeKey,
Object> {
+ static class ShrinkOnEviction implements RemovalListener<CompositeKey,
WeightedValue<Object>> {
private final
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
- CompositeKey, Object>
+ CompositeKey, WeightedValue<Object>>
cache;
+ private final LongAdder weightInBytes;
- ShrinkOnEviction(CacheBuilder<Object, Object> cacheBuilder) {
+ ShrinkOnEviction(
+ CacheBuilder<Object, WeightedValue<Object>> cacheBuilder, LongAdder
weightInBytes) {
this.cache = cacheBuilder.removalListener(this).build();
+ this.weightInBytes = weightInBytes;
}
public
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
- CompositeKey, Object>
+ CompositeKey, WeightedValue<Object>>
getCache() {
return cache;
}
@Override
- public void onRemoval(RemovalNotification<CompositeKey, Object>
removalNotification) {
+ public void onRemoval(
+ RemovalNotification<CompositeKey, WeightedValue<Object>>
removalNotification) {
+ weightInBytes.add(-removalNotification.getValue().getWeight());
if (removalNotification.wasEvicted()) {
- if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) {
+ if (!(removalNotification.getValue().getValue() instanceof
Cache.Shrinkable)) {
return;
}
- Object updatedEntry = ((Shrinkable<?>)
removalNotification.getValue()).shrink();
+ Object updatedEntry = ((Shrinkable<?>)
removalNotification.getValue().getValue()).shrink();
if (updatedEntry != null) {
- cache.put(removalNotification.getKey(), updatedEntry);
+ cache.put(removalNotification.getKey(),
addWeightedValue(updatedEntry, weightInBytes));
}
}
}
}
/** A cache that never stores any values. */
public static <K, V> Cache<K, V> noop() {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(new
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache());
+ return forMaximumBytes(0L);
}
/** A cache that never evicts any values. */
public static <K, V> Cache<K, V> eternal() {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(
- new
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache());
+ return forMaximumBytes(Long.MAX_VALUE);
}
/**
* Uses the specified {@link PipelineOptions} to configure and return a
cache instance based upon
* parameters within {@link SdkHarnessOptions}.
*/
public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
- // We specifically use Guava cache since it allows for recursive
computeIfAbsent calls
- // preventing deadlock from occurring when a loading function mutates the
underlying cache
- return (Cache<K, V>)
- forCache(
- new ShrinkOnEviction(
- CacheBuilder.newBuilder()
- .maximumWeight(
-
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb()
- * 1024L
- * 1024L
- / WEIGHT_RATIO)
- .weigher(
- new Weigher<Object, Object>() {
-
- @Override
- public int weigh(Object key, Object value) {
- long size;
- if (value instanceof Weighted) {
- size = Caches.weigh(key) + ((Weighted)
value).getWeight();
- } else {
- size = Caches.weigh(key) +
Caches.weigh(value);
- }
- size = size / WEIGHT_RATIO + 1;
- if (size >= Integer.MAX_VALUE) {
- LOG.warn(
- "Entry with size {} MiBs inserted into
the cache. This is larger than the maximum individual entry size of {} MiBs.
The cache will under report its memory usage by the difference. This may lead
to OutOfMemoryErrors.",
- (size / 1048576L) + 1,
- 2 * WEIGHT_RATIO * 1024);
- return Integer.MAX_VALUE;
- }
- return (int) size;
- }
- }))
- .getCache());
+ return forMaximumBytes(
+ options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() *
1048576L);
Review comment:
swapped to bit shift.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]