lukecwik commented on a change in pull request #16495:
URL: https://github.com/apache/beam/pull/16495#discussion_r785134812



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -57,92 +65,58 @@ public static long weigh(Object o) {
 
   /** An eviction listener that reduces the size of entries that are {@link 
Shrinkable}. */
   @VisibleForTesting
-  static class ShrinkOnEviction implements RemovalListener<CompositeKey, 
Object> {
+  static class ShrinkOnEviction implements RemovalListener<CompositeKey, 
WeightedValue<Object>> {
 
     private final 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
-            CompositeKey, Object>
+            CompositeKey, WeightedValue<Object>>
         cache;
+    private final LongAdder weightInBytes;
 
-    ShrinkOnEviction(CacheBuilder<Object, Object> cacheBuilder) {
+    ShrinkOnEviction(
+        CacheBuilder<Object, WeightedValue<Object>> cacheBuilder, LongAdder 
weightInBytes) {
       this.cache = cacheBuilder.removalListener(this).build();
+      this.weightInBytes = weightInBytes;
     }
 
     public 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
-            CompositeKey, Object>
+            CompositeKey, WeightedValue<Object>>
         getCache() {
       return cache;
     }
 
     @Override
-    public void onRemoval(RemovalNotification<CompositeKey, Object> 
removalNotification) {
+    public void onRemoval(
+        RemovalNotification<CompositeKey, WeightedValue<Object>> 
removalNotification) {
+      weightInBytes.add(-removalNotification.getValue().getWeight());
       if (removalNotification.wasEvicted()) {
-        if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) {
+        if (!(removalNotification.getValue().getValue() instanceof 
Cache.Shrinkable)) {
           return;
         }
-        Object updatedEntry = ((Shrinkable<?>) 
removalNotification.getValue()).shrink();
+        Object updatedEntry = ((Shrinkable<?>) 
removalNotification.getValue().getValue()).shrink();
         if (updatedEntry != null) {
-          cache.put(removalNotification.getKey(), updatedEntry);
+          cache.put(removalNotification.getKey(), 
addWeightedValue(updatedEntry, weightInBytes));
         }
       }
     }
   }
 
   /** A cache that never stores any values. */
   public static <K, V> Cache<K, V> noop() {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(new 
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache());
+    return forMaximumBytes(0L);
   }
 
   /** A cache that never evicts any values. */
   public static <K, V> Cache<K, V> eternal() {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(
-            new 
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache());
+    return forMaximumBytes(Long.MAX_VALUE);
   }
 
   /**
    * Uses the specified {@link PipelineOptions} to configure and return a 
cache instance based upon
    * parameters within {@link SdkHarnessOptions}.
    */
   public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(
-            new ShrinkOnEviction(
-                    CacheBuilder.newBuilder()
-                        .maximumWeight(
-                            
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb()
-                                * 1024L
-                                * 1024L
-                                / WEIGHT_RATIO)
-                        .weigher(
-                            new Weigher<Object, Object>() {
-
-                              @Override
-                              public int weigh(Object key, Object value) {
-                                long size;
-                                if (value instanceof Weighted) {
-                                  size = Caches.weigh(key) + ((Weighted) 
value).getWeight();
-                                } else {
-                                  size = Caches.weigh(key) + 
Caches.weigh(value);
-                                }
-                                size = size / WEIGHT_RATIO + 1;
-                                if (size >= Integer.MAX_VALUE) {
-                                  LOG.warn(
-                                      "Entry with size {} MiBs inserted into 
the cache. This is larger than the maximum individual entry size of {} MiBs. 
The cache will under report its memory usage by the difference. This may lead 
to OutOfMemoryErrors.",
-                                      (size / 1048576L) + 1,
-                                      2 * WEIGHT_RATIO * 1024);
-                                  return Integer.MAX_VALUE;
-                                }
-                                return (int) size;
-                              }
-                            }))
-                .getCache());
+    return forMaximumBytes(
+        options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 
1048576L);

Review comment:
       swapped to bit shift.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to