This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b666c6434dc [Dataflow Streaming] Make SideInputCache bytes and expiry
configurable (#29871)
b666c6434dc is described below
commit b666c6434dc6e15079b10bdaf1d6a88b530bb714
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Jan 9 03:40:52 2024 -0800
[Dataflow Streaming] Make SideInputCache bytes and expiry configurable
(#29871)
Co-authored-by: Arun Pandian <[email protected]>
---
.../dataflow/options/DataflowPipelineDebugOptions.java | 16 ++++++++++++++--
.../runners/dataflow/worker/StreamingDataflowWorker.java | 2 +-
.../worker/streaming/sideinput/SideInputCache.java | 11 ++++++-----
.../streaming/sideinput/SideInputStateFetcher.java | 6 ++++--
.../streaming/sideinput/SideInputStateFetcherTest.java | 14 +++++++++++---
5 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index f8649a1f0f3..290418bd1cb 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -239,8 +239,8 @@ public interface DataflowPipelineDebugOptions
/**
* The size of the worker's in-memory cache, in megabytes.
*
- * <p>Currently, this cache is used for storing read values of side inputs.
as well as the state
- * for streaming jobs.
+ * <p>Currently, this cache is used for storing read values of side inputs
in batch as well as the
+ * user state for streaming jobs.
*/
@Description("The size of the worker's in-memory cache, in megabytes.")
@Default.Integer(100)
@@ -248,6 +248,18 @@ public interface DataflowPipelineDebugOptions
void setWorkerCacheMb(Integer value);
+ @Description("The size of the streaming worker's side input cache, in
megabytes.")
+ @Default.Integer(100)
+ Integer getStreamingSideInputCacheMb();
+
+ void setStreamingSideInputCacheMb(Integer value);
+
+ @Description("The expiry for streaming worker's side input cache entries, in
milliseconds.")
+ @Default.Integer(60 * 1000) // 1 minute
+ Integer getStreamingSideInputCacheExpirationMillis();
+
+ void setstreamingSideInputCacheExpirationMillis(Integer value);
+
/**
* The amount of time before UnboundedReaders are considered idle and closed
during streaming
* execution.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0ddafa25f86..f68e5ba26c7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -424,7 +424,7 @@ public class StreamingDataflowWorker {
this.metricTrackingWindmillServer =
new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor,
windmillServiceEnabled);
this.metricTrackingWindmillServer.start();
- this.sideInputStateFetcher = new
SideInputStateFetcher(metricTrackingWindmillServer);
+ this.sideInputStateFetcher = new
SideInputStateFetcher(metricTrackingWindmillServer, options);
this.clientId = clientIdGenerator.nextLong();
for (MapTask mapTask : mapTasks) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
index 721c477435e..beb7c361d95 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
@@ -23,6 +23,7 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -40,8 +41,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher;
@CheckReturnValue
final class SideInputCache {
- private static final long MAXIMUM_CACHE_WEIGHT = 100000000; /* 100 MB */
- private static final long CACHE_ENTRY_EXPIRY_MINUTES = 1L;
+ private static final long BYTES_PER_MB = 1024 * 1024;
private final Cache<Key<?>, SideInput<?>> sideInputCache;
@@ -49,11 +49,12 @@ final class SideInputCache {
this.sideInputCache = sideInputCache;
}
- static SideInputCache create() {
+ static SideInputCache create(DataflowPipelineDebugOptions options) {
return new SideInputCache(
CacheBuilder.newBuilder()
- .maximumWeight(MAXIMUM_CACHE_WEIGHT)
- .expireAfterWrite(CACHE_ENTRY_EXPIRY_MINUTES, TimeUnit.MINUTES)
+ .maximumWeight(options.getStreamingSideInputCacheMb() *
BYTES_PER_MB)
+ .expireAfterWrite(
+ options.getStreamingSideInputCacheExpirationMillis(),
TimeUnit.MILLISECONDS)
.weigher((Weigher<Key<?>, SideInput<?>>) (id, entry) ->
entry.size())
.build());
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
index aa61c421935..e119c571946 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -61,8 +62,9 @@ public class SideInputStateFetcher {
private final MetricTrackingWindmillServerStub server;
private long bytesRead = 0L;
- public SideInputStateFetcher(MetricTrackingWindmillServerStub server) {
- this(server, SideInputCache.create());
+ public SideInputStateFetcher(
+ MetricTrackingWindmillServerStub server, DataflowPipelineDebugOptions
options) {
+ this(server, SideInputCache.create(options));
}
SideInputStateFetcher(MetricTrackingWindmillServerStub server,
SideInputCache sideInputCache) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
index 1e188da2dd6..d1e2b3ca60b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherTest.java
@@ -32,6 +32,7 @@ import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
@@ -79,7 +81,9 @@ public class SideInputStateFetcherTest {
@Test
public void testFetchGlobalDataBasic() throws Exception {
- SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+ SideInputStateFetcher fetcher =
+ new SideInputStateFetcher(
+ server,
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(StringUtf8Coder.of())
@@ -147,7 +151,9 @@ public class SideInputStateFetcherTest {
@Test
public void testFetchGlobalDataNull() throws Exception {
- SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+ SideInputStateFetcher fetcher =
+ new SideInputStateFetcher(
+ server,
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(VoidCoder.of())
@@ -302,7 +308,9 @@ public class SideInputStateFetcherTest {
@Test
public void testEmptyFetchGlobalData() throws Exception {
- SideInputStateFetcher fetcher = new SideInputStateFetcher(server);
+ SideInputStateFetcher fetcher =
+ new SideInputStateFetcher(
+ server,
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class));
ByteString encodedIterable = ByteString.EMPTY;