This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e1e6bdb2947 Change PeriodicSequence to report backlog accurately
(#32505)
e1e6bdb2947 is described below
commit e1e6bdb294718b2f7430eeec5e055ddd15bf9f8c
Author: Sam Whittle <[email protected]>
AuthorDate: Fri Sep 20 11:26:16 2024 +0200
Change PeriodicSequence to report backlog accurately (#32505)
---
.../beam/sdk/transforms/PeriodicSequence.java | 34 +++++++++++++++++++++-
.../beam/sdk/transforms/PeriodicSequenceTest.java | 22 ++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index 12cbecd04b0..9ad3141f966 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -33,6 +33,7 @@ import
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -164,7 +165,9 @@ public class PeriodicSequence
@Override
public IsBounded isBounded() {
- return IsBounded.BOUNDED;
+ return range.getTo() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()
+ ? IsBounded.UNBOUNDED
+ : IsBounded.BOUNDED;
}
@Override
@@ -213,6 +216,13 @@ public class PeriodicSequence
return null;
}
+ @GetSize
+ public double getSize(
+ @Element SequenceDefinition sequence, @Restriction OffsetRange
offsetRange) {
+ long nowMilliSec = Instant.now().getMillis();
+ return sequenceBacklogBytes(sequence.durationMilliSec, nowMilliSec,
offsetRange);
+ }
+
@ProcessElement
public ProcessContinuation processElement(
@Element SequenceDefinition srcElement,
@@ -257,4 +267,26 @@ public class PeriodicSequence
public PCollection<Instant> expand(PCollection<SequenceDefinition> input) {
return input.apply(ParDo.of(new PeriodicSequenceFn()));
}
+
+ private static final int ENCODED_INSTANT_BYTES = 8;
+
+ private static long ceilDiv(long a, long b) {
+ long result = Math.floorDiv(a, b);
+ if (a % b != 0) {
+ ++result;
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ static long sequenceBacklogBytes(
+ long durationMilliSec, long nowMilliSec, OffsetRange offsetRange) {
+ // Find the # of outputs expected for overlap of offsetRange and [-inf,
now)
+ long start = ceilDiv(offsetRange.getFrom(), durationMilliSec);
+ long end = ceilDiv(Math.min(nowMilliSec, offsetRange.getTo() - 1),
durationMilliSec);
+ if (start >= end) {
+ return 0;
+ }
+ return ENCODED_INSTANT_BYTES * (end - start);
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
index 3ace145eba8..541a7093387 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -136,4 +137,25 @@ public class PeriodicSequenceTest {
p.run().waitUntilFinish();
}
+
+ @Test
+ public void testBacklogBytes() {
+ assertEquals(
+ 0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(100,
Long.MAX_VALUE)));
+ assertEquals(
+ 8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(90,
Long.MAX_VALUE)));
+ assertEquals(
+ 0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(91,
Long.MAX_VALUE)));
+ assertEquals(
+ 8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(89,
Long.MAX_VALUE)));
+ assertEquals(
+ 16, PeriodicSequence.sequenceBacklogBytes(10, 101, new OffsetRange(81,
Long.MAX_VALUE)));
+ assertEquals(
+ 8 * 10000 / 100,
+ PeriodicSequence.sequenceBacklogBytes(100, 10000, new OffsetRange(0,
Long.MAX_VALUE)));
+ assertEquals(
+ 0, PeriodicSequence.sequenceBacklogBytes(10, 10000, new
OffsetRange(10011, 10025)));
+ assertEquals(
+ 8, PeriodicSequence.sequenceBacklogBytes(10, 10100, new
OffsetRange(10011, 10025)));
+ }
}