This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b666c6434dc [Dataflow Streaming] Make SideInputCache bytes and expiry 
configurable (#29871)
b666c6434dc is described below

commit b666c6434dc6e15079b10bdaf1d6a88b530bb714
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Jan 9 03:40:52 2024 -0800

    [Dataflow Streaming] Make SideInputCache bytes and expiry configurable 
(#29871)
    
    Co-authored-by: Arun Pandian <[email protected]>
---
 .../dataflow/options/DataflowPipelineDebugOptions.java   | 16 ++++++++++++++--
 .../runners/dataflow/worker/StreamingDataflowWorker.java |  2 +-
 .../worker/streaming/sideinput/SideInputCache.java       | 11 ++++++-----
 .../streaming/sideinput/SideInputStateFetcher.java       |  6 ++++--
 .../streaming/sideinput/SideInputStateFetcherTest.java   | 14 +++++++++++---
 5 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index f8649a1f0f3..290418bd1cb 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -239,8 +239,8 @@ public interface DataflowPipelineDebugOptions
   /**
    * The size of the worker's in-memory cache, in megabytes.
    *
-   * <p>Currently, this cache is used for storing read values of side inputs. 
as well as the state
-   * for streaming jobs.
+   * <p>Currently, this cache is used for storing read values of side inputs 
in batch as well as the
+   * user state for streaming jobs.
    */
   @Description("The size of the worker's in-memory cache, in megabytes.")
   @Default.Integer(100)
@@ -248,6 +248,18 @@ public interface DataflowPipelineDebugOptions
 
   void setWorkerCacheMb(Integer value);
 
+  @Description("The size of the streaming worker's side input cache, in 
megabytes.")
+  @Default.Integer(100)
+  Integer getStreamingSideInputCacheMb();
+
+  void setStreamingSideInputCacheMb(Integer value);
+
+  @Description("The expiry for streaming worker's side input cache entries, in 
milliseconds.")
+  @Default.Integer(60 * 1000) // 1 minute
+  Integer getStreamingSideInputCacheExpirationMillis();
+
+  void setstreamingSideInputCacheExpirationMillis(Integer value);
+
   /**
    * The amount of time before UnboundedReaders are considered idle and closed 
during streaming
    * execution.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0ddafa25f86..f68e5ba26c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -424,7 +424,7 @@ public class StreamingDataflowWorker {
     this.metricTrackingWindmillServer =
         new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, 
windmillServiceEnabled);
     this.metricTrackingWindmillServer.start();
-    this.sideInputStateFetcher = new 
SideInputStateFetcher(metricTrackingWindmillServer);
+    this.sideInputStateFetcher = new 
SideInputStateFetcher(metricTrackingWindmillServer, options);
     this.clientId = clientIdGenerator.nextLong();
 
     for (MapTask mapTask : mapTasks) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
index 721c477435e..beb7c361d95 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -40,8 +41,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher;
 @CheckReturnValue
 final class SideInputCache {
 
-  private static final long MAXIMUM_CACHE_WEIGHT = 100000000; /* 100 MB */
-  private static final long CACHE_ENTRY_EXPIRY_MINUTES = 1L;
+  private static final long BYTES_PER_MB = 1024 * 1024;
 
   private final Cache<Key<?>, SideInput<?>> sideInputCache;
 
@@ -49,11 +49,12 @@ final class SideInputCache {
     this.sideInputCache = sideInputCache;
   }
 
-  static SideInputCache create() {
+  static SideInputCache create(DataflowPipelineDebugOptions options) {
     return new SideInputCache(
         CacheBuilder.newBuilder()
-            .maximumWeight(MAXIMUM_CACHE_WEIGHT)
-            .expireAfterWrite(CACHE_ENTRY_EXPIRY_MINUTES, TimeUnit.MINUTES)
+            .maximumWeight(options.getStreamingSideInputCacheMb() * 
BYTES_PER_MB)
+            .expireAfterWrite(
+                options.getStreamingSideInputCacheExpirationMillis(), 
TimeUnit.MILLISECONDS)
             .weigher((Weigher<Key<?>, SideInput<?>>) (id, entry) -> 
entry.size())
             .build());
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
index aa61c421935..e119c571946 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import 
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -61,8 +62,9 @@ public class SideInputStateFetcher {
   private final MetricTrackingWindmillServerStub server;
   private long bytesRead = 0L;
 
-  public SideInputStateFetcher(MetricTrackingWindmillServerStub server) {
-    this(server, SideInputCache.create());
+  public SideInputStateFetcher(
+      MetricTrackingWindmillServerStub server, DataflowPipelineDebugOptions 
options) {
+    this(server, SideInputCache.create(options));
   }
 
   SideInputStateFetcher(MetricTrackingWindmillServerStub server, 
SideInputCache sideInputCache) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
index 1e188da2dd6..d1e2b3ca60b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
@@ -32,6 +32,7 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import 
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.coders.Coder;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
@@ -79,7 +81,9 @@ public class SideInputStateFetcherTest {
 
   @Test
   public void testFetchGlobalDataBasic() throws Exception {
-    SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+    SideInputStateFetcher fetcher =
+        new SideInputStateFetcher(
+            server, 
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
 
     ByteStringOutputStream stream = new ByteStringOutputStream();
     ListCoder.of(StringUtf8Coder.of())
@@ -147,7 +151,9 @@ public class SideInputStateFetcherTest {
 
   @Test
   public void testFetchGlobalDataNull() throws Exception {
-    SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+    SideInputStateFetcher fetcher =
+        new SideInputStateFetcher(
+            server, 
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
 
     ByteStringOutputStream stream = new ByteStringOutputStream();
     ListCoder.of(VoidCoder.of())
@@ -302,7 +308,9 @@ public class SideInputStateFetcherTest {
 
   @Test
   public void testEmptyFetchGlobalData() throws Exception {
-    SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+    SideInputStateFetcher fetcher =
+        new SideInputStateFetcher(
+            server, 
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
 
     ByteString encodedIterable = ByteString.EMPTY;
 

Reply via email to