This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e6bdb2947 Change PeriodicSequence to report backlog accurately 
(#32505)
e1e6bdb2947 is described below

commit e1e6bdb294718b2f7430eeec5e055ddd15bf9f8c
Author: Sam Whittle <[email protected]>
AuthorDate: Fri Sep 20 11:26:16 2024 +0200

    Change PeriodicSequence to report backlog accurately (#32505)
---
 .../beam/sdk/transforms/PeriodicSequence.java      | 34 +++++++++++++++++++++-
 .../beam/sdk/transforms/PeriodicSequenceTest.java  | 22 ++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
index 12cbecd04b0..9ad3141f966 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -33,6 +33,7 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -164,7 +165,9 @@ public class PeriodicSequence
 
     @Override
     public IsBounded isBounded() {
-      return IsBounded.BOUNDED;
+      return range.getTo() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()
+          ? IsBounded.UNBOUNDED
+          : IsBounded.BOUNDED;
     }
 
     @Override
@@ -213,6 +216,13 @@ public class PeriodicSequence
       return null;
     }
 
+    @GetSize
+    public double getSize(
+        @Element SequenceDefinition sequence, @Restriction OffsetRange 
offsetRange) {
+      long nowMilliSec = Instant.now().getMillis();
+      return sequenceBacklogBytes(sequence.durationMilliSec, nowMilliSec, 
offsetRange);
+    }
+
     @ProcessElement
     public ProcessContinuation processElement(
         @Element SequenceDefinition srcElement,
@@ -257,4 +267,26 @@ public class PeriodicSequence
   public PCollection<Instant> expand(PCollection<SequenceDefinition> input) {
     return input.apply(ParDo.of(new PeriodicSequenceFn()));
   }
+
+  private static final int ENCODED_INSTANT_BYTES = 8;
+
+  private static long ceilDiv(long a, long b) {
+    long result = Math.floorDiv(a, b);
+    if (a % b != 0) {
+      ++result;
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  static long sequenceBacklogBytes(
+      long durationMilliSec, long nowMilliSec, OffsetRange offsetRange) {
+    // Find the # of outputs expected for overlap of offsetRange and [-inf, 
now)
+    long start = ceilDiv(offsetRange.getFrom(), durationMilliSec);
+    long end = ceilDiv(Math.min(nowMilliSec, offsetRange.getTo() - 1), 
durationMilliSec);
+    if (start >= end) {
+      return 0;
+    }
+    return ENCODED_INSTANT_BYTES * (end - start);
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
index 3ace145eba8..541a7093387 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -136,4 +137,25 @@ public class PeriodicSequenceTest {
 
     p.run().waitUntilFinish();
   }
+
+  @Test
+  public void testBacklogBytes() {
+    assertEquals(
+        0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(100, 
Long.MAX_VALUE)));
+    assertEquals(
+        8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(90, 
Long.MAX_VALUE)));
+    assertEquals(
+        0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(91, 
Long.MAX_VALUE)));
+    assertEquals(
+        8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(89, 
Long.MAX_VALUE)));
+    assertEquals(
+        16, PeriodicSequence.sequenceBacklogBytes(10, 101, new OffsetRange(81, 
Long.MAX_VALUE)));
+    assertEquals(
+        8 * 10000 / 100,
+        PeriodicSequence.sequenceBacklogBytes(100, 10000, new OffsetRange(0, 
Long.MAX_VALUE)));
+    assertEquals(
+        0, PeriodicSequence.sequenceBacklogBytes(10, 10000, new 
OffsetRange(10011, 10025)));
+    assertEquals(
+        8, PeriodicSequence.sequenceBacklogBytes(10, 10100, new 
OffsetRange(10011, 10025)));
+  }
 }

Reply via email to