This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 38558ced724 [BEAM-12164]: fix the negative throughput issue (#17461)
38558ced724 is described below
commit 38558ced7243e24aa9029b419da57b18f84b1683
Author: Hengfeng Li <[email protected]>
AuthorDate: Wed Apr 27 03:25:29 2022 +1000
[BEAM-12164]: fix the negative throughput issue (#17461)
---
.../spanner/changestreams/restriction/ThroughputEstimator.java | 3 ++-
.../changestreams/restriction/ThroughputEstimatorTest.java | 8 ++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
index 513e742dbde..ac55bf9985a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
@@ -91,6 +91,7 @@ public class ThroughputEstimator implements Serializable {
}
return bytesInQueue
.divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO)
.doubleValue();
}
@@ -103,7 +104,7 @@ public class ThroughputEstimator implements Serializable {
// Remove the element if the timestamp of the first element is beyond
// the time range to look backward.
ImmutablePair<Timestamp, BigDecimal> pair = queue.remove();
- bytesInQueue = bytesInQueue.subtract(pair.getRight());
+ bytesInQueue =
bytesInQueue.subtract(pair.getRight()).max(BigDecimal.ZERO);
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
index 1c1282dd446..f163f5c90d8 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
@@ -110,6 +110,14 @@ public class ThroughputEstimatorTest {
assertEquals(want.doubleValue(), actual, DELTA);
}
+ @Test
+ public void testThroughputShouldNotBeNegative() {
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(0, 0), -10);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(1, 0), 10);
+ double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(0, 0));
+ assertEquals(0D, actual, DELTA);
+ }
+
private List<ImmutablePair<Timestamp, Long>> generateTestData(
int size, int startSeconds, int endSeconds, long maxBytes) {
Random random = new Random();