This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 15f7f02a79d Add UnboundedReaderMaxReadTimeMs to
DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091)
15f7f02a79d is described below
commit 15f7f02a79d81786083b8cff5e69ed72b361145f
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Fri Apr 26 14:48:31 2024 +0200
Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate
UnboundedReaderMaxReadTimeSec (#31091)
---
.../options/DataflowPipelineDebugOptions.java | 27 +++++++++++++++++++++-
.../dataflow/worker/WorkerCustomSources.java | 5 ++--
.../dataflow/worker/WorkerCustomSourcesTest.java | 6 ++---
3 files changed, 31 insertions(+), 7 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 30496dec296..7a5284151b9 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -216,14 +216,39 @@ public interface DataflowPipelineDebugOptions
void setReaderCacheTimeoutSec(Integer value);
- /** The max amount of time an UnboundedReader is consumed before
checkpointing. */
+ /**
+ * The max amount of time an UnboundedReader is consumed before
checkpointing.
+ *
+ * @deprecated use {@link
DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeMs()} instead
+ */
@Description(
"The max amount of time before an UnboundedReader is consumed before
checkpointing, in seconds.")
+ @Deprecated
@Default.Integer(10)
Integer getUnboundedReaderMaxReadTimeSec();
void setUnboundedReaderMaxReadTimeSec(Integer value);
+ /** The max amount of time an UnboundedReader is consumed before
checkpointing. */
+ @Description(
+ "The max amount of time before an UnboundedReader is consumed before
checkpointing, in millis.")
+ @Default.InstanceFactory(UnboundedReaderMaxReadTimeFactory.class)
+ Integer getUnboundedReaderMaxReadTimeMs();
+
+ void setUnboundedReaderMaxReadTimeMs(Integer value);
+
+ /**
+ * Sets Integer value based on old, deprecated field ({@link
+ * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}).
+ */
+ final class UnboundedReaderMaxReadTimeFactory implements
DefaultValueFactory<Integer> {
+ @Override
+ public Integer create(PipelineOptions options) {
+ DataflowPipelineDebugOptions debugOptions =
options.as(DataflowPipelineDebugOptions.class);
+ return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000;
+ }
+ }
+
/** The max elements read from an UnboundedReader before checkpointing. */
@Description("The max elements read from an UnboundedReader before
checkpointing. ")
@Default.Integer(10 * 1000)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 8c086016ee9..b965110b3ef 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -796,9 +796,8 @@ public class WorkerCustomSources {
this.context = context;
this.started = started;
DataflowPipelineDebugOptions debugOptions =
options.as(DataflowPipelineDebugOptions.class);
- this.endTime =
- Instant.now()
-
.plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec()));
+ long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs();
+ this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs));
this.maxElems = debugOptions.getUnboundedReaderMaxElements();
this.backoffFactory =
FluentBackoff.DEFAULT
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index d451ec093f7..cc9e6da4a73 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -645,10 +645,10 @@ public class WorkerCustomSourcesTest {
numReadOnThisIteration++;
}
Instant afterReading = Instant.now();
- long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
+ long maxReadMs = debugOptions.getUnboundedReaderMaxReadTimeMs();
assertThat(
- new Duration(beforeReading, afterReading).getStandardSeconds(),
- lessThanOrEqualTo(maxReadSec + 1));
+ new Duration(beforeReading, afterReading).getMillis(),
+ lessThanOrEqualTo(maxReadMs + 1000L));
assertThat(
numReadOnThisIteration,
lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));