This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 38558ced724 [BEAM-12164]: fix the negative throughput issue (#17461)
38558ced724 is described below

commit 38558ced7243e24aa9029b419da57b18f84b1683
Author: Hengfeng Li <[email protected]>
AuthorDate: Wed Apr 27 03:25:29 2022 +1000

    [BEAM-12164]: fix the negative throughput issue (#17461)
---
 .../spanner/changestreams/restriction/ThroughputEstimator.java    | 3 ++-
 .../changestreams/restriction/ThroughputEstimatorTest.java        | 8 ++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
index 513e742dbde..ac55bf9985a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
@@ -91,6 +91,7 @@ public class ThroughputEstimator implements Serializable {
     }
     return bytesInQueue
         .divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128)
+        .max(BigDecimal.ZERO)
         .doubleValue();
   }
 
@@ -103,7 +104,7 @@ public class ThroughputEstimator implements Serializable {
       // Remove the element if the timestamp of the first element is beyond
       // the time range to look backward.
       ImmutablePair<Timestamp, BigDecimal> pair = queue.remove();
-      bytesInQueue = bytesInQueue.subtract(pair.getRight());
+      bytesInQueue = 
bytesInQueue.subtract(pair.getRight()).max(BigDecimal.ZERO);
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
index 1c1282dd446..f163f5c90d8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
@@ -110,6 +110,14 @@ public class ThroughputEstimatorTest {
     assertEquals(want.doubleValue(), actual, DELTA);
   }
 
+  @Test
+  public void testThroughputShouldNotBeNegative() {
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(0, 0), -10);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(1, 0), 10);
+    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(0, 0));
+    assertEquals(0D, actual, DELTA);
+  }
+
   private List<ImmutablePair<Timestamp, Long>> generateTestData(
       int size, int startSeconds, int endSeconds, long maxBytes) {
     Random random = new Random();

Reply via email to