This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c65e777d6c6 Optimize PGBK table to only update cache when there is a 
large enough size change. #21250 (#25219)
c65e777d6c6 is described below

commit c65e777d6c62d25184b91fa50d7281411c22f2ec
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Feb 3 14:45:15 2023 -0800

    Optimize PGBK table to only update cache when there is a large enough size 
change. #21250 (#25219)
    
    * Optimize PGBK table to only update cache when there is a large enough 
size change. #21250
    
    This prevents an expensive scenario where a user is outputting lots of 
small values (e.g. ints) to be precombined and hence takes little to no space 
to store so updating the cache provides little value.
    
    Note the 5-10x change for all types except for unique keys. Some early 
profiles show that there is an issue with the G1 garbage collector when storing 
so many small values that the GC management overhead dominates 75% of the 
execution which requires further investigation.
    
    Before:
    ```
    Benchmark                                                 (distribution)  
(globallyWindowed)   Mode  Cnt   Score   Error  Units
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform    
            true  thrpt    5   8.306 ± 1.255  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform    
           false  thrpt    5   7.849 ± 0.476  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal    
            true  thrpt    5  10.575 ± 1.295  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal    
           false  thrpt    5  10.772 ± 0.141  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey    
            true  thrpt    5   9.131 ± 2.761  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey    
           false  thrpt    5   8.302 ± 1.078  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys    
            true  thrpt    5   3.899 ± 1.737  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys    
           false  thrpt    5   4.203 ± 2.170  ops/s
    
    ```
    
    After:
    ```
    Benchmark                                                 (distribution)  
(globallyWindowed)   Mode  Cnt   Score   Error  Units
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform    
            true  thrpt    5  88.740 ± 8.925  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform    
           false  thrpt    5  76.005 ± 5.150  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal    
            true  thrpt    5  43.388 ± 1.966  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal    
           false  thrpt    5  37.804 ± 7.177  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey    
            true  thrpt    5  84.881 ± 5.040  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey    
           false  thrpt    5  74.183 ± 2.063  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys    
            true  thrpt    5   5.567 ± 4.068  ops/s
    PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys    
           false  thrpt    5   6.957 ± 1.508  ops/s
    ```
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  2 +-
 .../jmh/PrecombineGroupingTableBenchmark.java      | 87 +++++++++++++---------
 .../java/org/apache/beam/fn/harness/Caches.java    | 43 ++++++++++-
 .../beam/fn/harness/PrecombineGroupingTable.java   | 10 +--
 .../fn/harness/PrecombineGroupingTableTest.java    | 33 ++++----
 5 files changed, 117 insertions(+), 58 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4d9c2109b75..414b3ccdc55 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1553,7 +1553,7 @@ class BeamModulePlugin implements Plugin<Project> {
             if (project.file("/opt/cprof/profiler_java_agent.so").exists()) {
               def gcpProject = project.findProperty('gcpProject') ?: 
'apache-beam-testing'
               def userName = 
System.getProperty("user.name").toLowerCase().replaceAll(" ", "_")
-              jvmArgs 
'-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" 
+ project.getProperty("benchmark").toLowerCase() + '_' + 
System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + 
',-cprof_zone_name=us-central1-a'
+              jvmArgs 
'-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" 
+ project.getProperty("benchmark").toLowerCase() + '_' + 
String.format('%1$tY%1$tm%1$td_%1$tH%1$tM%1$tS_%1$tL', 
System.currentTimeMillis()) + ',-cprof_project_id=' + gcpProject + 
',-cprof_zone_name=us-central1-a'
             }
           } else {
             // We filter for only Apache Beam benchmarks to ensure that we 
aren't
diff --git 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
index 0deaf96f18f..ded8358a10d 100644
--- 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
+++ 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import org.apache.beam.fn.harness.Cache;
 import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.fn.harness.Caches.ClearableCache;
 import org.apache.beam.fn.harness.PrecombineGroupingTable;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -36,15 +38,20 @@ import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.infra.Blackhole;
 
 public class PrecombineGroupingTableBenchmark {
   private static final int TOTAL_VALUES = 1_000_000;
+  private static final int KEY_SPACE = 1_000;
 
   @State(Scope.Benchmark)
   public static class SumIntegerBinaryCombine {
     final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers();
     final PipelineOptions options = PipelineOptionsFactory.create();
+
+    final Cache<Object, Object> cache = Caches.fromOptions(options);
+
     List<WindowedValue<KV<String, Integer>>> elements;
 
     @Param({"true", "false"})
@@ -55,51 +62,60 @@ public class PrecombineGroupingTableBenchmark {
 
     @Setup(Level.Trial)
     public void setUp() {
-      // Use a stable seed to ensure consistency across benchmark runs
-      Random random = new Random(-2134890234);
-      elements = new ArrayList<>();
-      switch (distribution) {
-        case "uniform":
-          for (int i = 0; i < TOTAL_VALUES; ++i) {
-            int key = random.nextInt(1000);
-            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
-          }
-          break;
-        case "normal":
-          for (int i = 0; i < TOTAL_VALUES; ++i) {
-            int key = (int) (random.nextGaussian() * 1000);
-            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
-          }
-          break;
-        case "hotKey":
-          for (int i = 0; i < TOTAL_VALUES; ++i) {
-            int key;
-            if (random.nextBoolean()) {
-              key = 0;
-            } else {
-              key = random.nextInt(1000);
-            }
-            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
-          }
-          break;
-        case "uniqueKeys":
-          for (int i = 0; i < TOTAL_VALUES; ++i) {
-            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
+      this.elements = generateTestData(distribution);
+    }
+  }
+
+  private static List<WindowedValue<KV<String, Integer>>> 
generateTestData(String distribution) {
+    // Use a stable seed to ensure consistency across benchmark runs
+    Random random = new Random(-2134890234);
+    List<WindowedValue<KV<String, Integer>>> elements = new ArrayList<>();
+    switch (distribution) {
+      case "uniform":
+        for (int i = 0; i < TOTAL_VALUES; ++i) {
+          int key = random.nextInt(KEY_SPACE);
+          
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+        }
+        break;
+      case "normal":
+        for (int i = 0; i < TOTAL_VALUES; ++i) {
+          int key = (int) (random.nextGaussian() * KEY_SPACE);
+          
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+        }
+        break;
+      case "hotKey":
+        for (int i = 0; i < TOTAL_VALUES; ++i) {
+          int key;
+          if (random.nextBoolean()) {
+            key = -123814201;
+          } else {
+            key = random.nextInt(KEY_SPACE);
           }
-          Collections.shuffle(elements, random);
-          break;
-        default:
-      }
+          
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+        }
+        break;
+      case "uniqueKeys":
+        for (int i = 0; i < TOTAL_VALUES; ++i) {
+          
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
+        }
+        Collections.shuffle(elements, random);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown distribution: " + 
distribution);
     }
+    return elements;
   }
 
   @Benchmark
+  @Threads(16)
   public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole 
blackhole)
       throws Exception {
+    ClearableCache<Object, Object> cache =
+        new ClearableCache<>(Caches.subCache(table.cache, 
Thread.currentThread().getName()));
     PrecombineGroupingTable<String, Integer, int[]> groupingTable =
         PrecombineGroupingTable.combiningAndSampling(
             table.options,
-            Caches.eternal(),
+            cache,
             table.sumInts,
             StringUtf8Coder.of(),
             .001,
@@ -108,5 +124,6 @@ public class PrecombineGroupingTableBenchmark {
       groupingTable.put(table.elements.get(i), blackhole::consume);
     }
     groupingTable.flush(blackhole::consume);
+    cache.clear();
   }
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
index 514b21575b2..5b2330b72f6 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
@@ -53,12 +53,25 @@ public final class Caches {
    */
   @VisibleForTesting static final int WEIGHT_RATIO = 6;
 
+  /** All objects less than or equal to this size will account for 1. */
+  private static final long MIN_OBJECT_SIZE = 1 << WEIGHT_RATIO;
+
+  /**
+   * Objects which change in this amount should always update the cache.
+   *
+   * <p>The limit of 2^16 is chosen to be small enough such that objects will 
be close enough if
+   * they change frequently. Future work could scale these ratios based upon 
the configured cache
+   * size.
+   */
+  private static final long CACHE_SIZE_CHANGE_LIMIT_BYTES = 1 << 16;
+
   private static final MemoryMeter MEMORY_METER =
       MemoryMeter.builder().withGuessing(Guess.BEST).build();
 
   /** The size of a reference. */
   public static final long REFERENCE_SIZE = 8;
 
+  /** Returns the amount of memory in bytes the provided object consumes. */
   public static long weigh(Object o) {
     if (o == null) {
       return REFERENCE_SIZE;
@@ -73,6 +86,25 @@ public final class Caches {
     }
   }
 
+  /**
+   * Returns whether the cache should be updated in the case where the objects 
size has changed.
+   *
+   * <p>Note that this should only be used in the case where the cache is 
being updated very often
+   * in a tight loop and is not a good fit for cases where the object being 
cached is the result of
+   * an expensive operation like a disk read or remote service call.
+   */
+  public static boolean shouldUpdateOnSizeChange(long oldSize, long newSize) {
+    /*
+    Our strategy is three fold:
+    - tiny objects don't impact the cache accounting and count as a size of 
`1` in the cache.
+    - large changes (>= CACHE_SIZE_CHANGE_LIMIT_BYTES) should always update 
the size
+    - all others if the size changed by a factor of 2
+    */
+    return (oldSize > MIN_OBJECT_SIZE || newSize > MIN_OBJECT_SIZE)
+        && ((newSize - oldSize >= CACHE_SIZE_CHANGE_LIMIT_BYTES)
+            || Long.highestOneBit(oldSize) != Long.highestOneBit(newSize));
+  }
+
   /** An eviction listener that reduces the size of entries that are {@link 
Shrinkable}. */
   @VisibleForTesting
   static class ShrinkOnEviction implements RemovalListener<CompositeKey, 
WeightedValue<Object>> {
@@ -184,8 +216,15 @@ public final class Caches {
                     // which is why we set the concurrency level to 1. See
                     // https://github.com/google/guava/issues/3462 for further 
details.
                     //
-                    // The ProcessBundleBenchmark#testStateWithCaching shows 
no noticeable change
-                    // when this parameter is left at the default.
+                    // The PrecombineGroupingTable showed contention here 
since it was working in
+                    // a tight loop. We were able to resolve the contention by 
reducing the
+                    // frequency of updates. Reconsider this value if we could 
solve the maximum
+                    // entry size issue. Note that using 
Runtime.getRuntime().availableProcessors()
+                    // is subject to docker CPU shares issues
+                    // (https://bugs.openjdk.org/browse/JDK-8281181).
+                    //
+                    // We could revisit the caffeine cache library based upon 
reinvestigating
+                    // recursive computeIfAbsent calls since it doesn't have 
this limit.
                     .concurrencyLevel(1)
                     .recordStats(),
                 weightInBytes)
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
index e311322880c..438f6b9d668 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
@@ -128,8 +128,7 @@ public class PrecombineGroupingTable<K, InputT, AccumT>
   private final AtomicLong maxWeight;
   private long weight;
   private final boolean isGloballyWindowed;
-  private long checkFlushCounter;
-  private long checkFlushLimit = -5;
+  private long lastWeightForFlush;
 
   private static final class Key implements Weighted {
     private static final Key INSTANCE = new Key();
@@ -407,12 +406,9 @@ public class PrecombineGroupingTable<K, InputT, AccumT>
           return tableEntry;
         });
 
-    if (checkFlushCounter++ < checkFlushLimit) {
-      return;
-    } else {
-      checkFlushLimit = Math.min(checkFlushLimit + 1, 25);
-      checkFlushCounter = 0;
+    if (Caches.shouldUpdateOnSizeChange(lastWeightForFlush, weight)) {
       flushIfNeeded(receiver);
+      lastWeightForFlush = weight;
     }
   }
 
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
index 28def2ef6f5..d9fd19c651a 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 
@@ -156,19 +157,19 @@ public class PrecombineGroupingTableTest {
     assertThat(receiver.outputElems, empty());
 
     // Putting in other large keys should cause eviction.
-    table.put(valueInGlobalWindow(KV.of("BBB", 9)), receiver);
+    table.put(valueInGlobalWindow(KV.of("BB", 509)), receiver);
     table.put(valueInGlobalWindow(KV.of("CCC", 11)), receiver);
     assertThat(
         receiver.outputElems,
         containsInAnyOrder(
-            valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), 
valueInGlobalWindow(KV.of("BBB", 9L))));
+            valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), 
valueInGlobalWindow(KV.of("BB", 509L))));
 
     table.flush(receiver);
     assertThat(
         receiver.outputElems,
         containsInAnyOrder(
             valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)),
-            valueInGlobalWindow(KV.of("BBB", 9L)),
+            valueInGlobalWindow(KV.of("BB", 509L)),
             valueInGlobalWindow(KV.of("CCC", 11L))));
   }
 
@@ -225,17 +226,20 @@ public class PrecombineGroupingTableTest {
 
     // Insert three compactable values which shouldn't lead to eviction even 
though we are over
     // the maximum size.
-    table.put(valueInGlobalWindow(KV.of("A", 1004)), receiver);
-    table.put(valueInGlobalWindow(KV.of("B", 1004)), receiver);
+    table.put(valueInGlobalWindow(KV.of("A", 804)), receiver);
+    table.put(valueInGlobalWindow(KV.of("B", 904)), receiver);
     table.put(valueInGlobalWindow(KV.of("C", 1004)), receiver);
     assertThat(receiver.outputElems, empty());
 
+    // Ensure that compaction occurred during the insertion of the above 
elements before flushing.
+    assertThat(table.getWeight(), lessThan(804L + 904L + 1004L));
+
     table.flush(receiver);
     assertThat(
         receiver.outputElems,
         containsInAnyOrder(
-            valueInGlobalWindow(KV.of("A", 1004L / 4)),
-            valueInGlobalWindow(KV.of("B", 1004L / 4)),
+            valueInGlobalWindow(KV.of("A", 804L / 4)),
+            valueInGlobalWindow(KV.of("B", 904L / 4)),
             valueInGlobalWindow(KV.of("C", 1004L / 4))));
   }
 
@@ -254,20 +258,20 @@ public class PrecombineGroupingTableTest {
 
     // Insert three values which even with compaction isn't enough so we evict 
A & B to get
     // under the max weight.
-    table.put(valueInGlobalWindow(KV.of("A", 1001)), receiver);
-    table.put(valueInGlobalWindow(KV.of("B", 1001)), receiver);
+    table.put(valueInGlobalWindow(KV.of("A", 801)), receiver);
+    table.put(valueInGlobalWindow(KV.of("B", 901)), receiver);
     table.put(valueInGlobalWindow(KV.of("C", 1001)), receiver);
     assertThat(
         receiver.outputElems,
         containsInAnyOrder(
-            valueInGlobalWindow(KV.of("A", 1001L)), 
valueInGlobalWindow(KV.of("B", 1001L))));
+            valueInGlobalWindow(KV.of("A", 801L)), 
valueInGlobalWindow(KV.of("B", 901L))));
 
     table.flush(receiver);
     assertThat(
         receiver.outputElems,
         containsInAnyOrder(
-            valueInGlobalWindow(KV.of("A", 1001L)),
-            valueInGlobalWindow(KV.of("B", 1001L)),
+            valueInGlobalWindow(KV.of("A", 801L)),
+            valueInGlobalWindow(KV.of("B", 901L)),
             valueInGlobalWindow(KV.of("C", 1001L))));
   }
 
@@ -460,7 +464,10 @@ public class PrecombineGroupingTableTest {
     };
   }
 
-  /** "Estimate" the size of strings by taking the tenth power of their 
length. */
+  /**
+   * Used to simulate very specific compaction/eviction tests under certain 
scenarios instead of
+   * relying on JAMM for size estimation. Strings are 10^length and longs are 
their value.
+   */
   private static class TestSizeEstimator implements SizeEstimator {
     int calls = 0;
 

Reply via email to