thiagotnunes commented on code in PR #17200:
URL: https://github.com/apache/beam/pull/17200#discussion_r843356034
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampUtils.java:
##########
@@ -30,6 +30,11 @@
BigDecimal.valueOf(Timestamp.MIN_VALUE.getSeconds());
private static final int NANOS_PER_SECOND = (int)
TimeUnit.SECONDS.toNanos(1);
+ /** This interface is only used for replacing now() with a constant
timestamp for testing. */
+ public interface TimeFunction {
Review Comment:
nit: We could use a `java.util.Supplier<Timestamp>` instead of creating a
new interface
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java:
##########
@@ -98,6 +102,7 @@ public ReadChangeStreamPartitionDoFn(
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.metrics = metrics;
+ this.throughputEstimator = new ThroughputEstimator();
Review Comment:
Can we inject the throughput estimator instead of creating it in the
constructor? This makes it easier to test (injecting a mock, for instance)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** An estimator to provide an estimate on the throughput of the outputted
elements. */
+public class ThroughputEstimator implements Serializable {
+
+ private static class Pair<K, V> {
+ private final K first;
+ private final V second;
+
+ public Pair(K first, V second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ public K getFirst() {
+ return first;
+ }
+
+ public V getSecond() {
+ return second;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("first: %s, second: %s", first, second);
+ }
+ }
+
+ private static final long serialVersionUID = -3597929310338724800L;
+ // The start time of each per-second window.
+ private Timestamp startTimeOfCurrentWindow;
+ // The bytes of the current window.
+ private double bytesInCurrentWindow;
+ // The number of seconds to look in the past.
+ private final int numOfSeconds = 60;
+ // The total bytes of all windows in the queue.
+ private double bytesInQueue;
+ // The queue holds a number of windows in the past in order to calculate
+ // a rolling windowing throughput.
+ private Queue<Pair<Timestamp, Double>> queue;
+
+ public ThroughputEstimator() {
+ queue = new ArrayDeque<>();
+ }
+
+ /**
+ * Updates the estimator with the bytes of records.
+ *
+ * @param timeOfRecords the committed timestamp of the records
+ * @param bytes the total bytes of the records
+ */
+ public void update(Timestamp timeOfRecords, double bytes) {
+ if (startTimeOfCurrentWindow == null) {
+ bytesInCurrentWindow = bytes;
+ startTimeOfCurrentWindow = timeOfRecords;
+ return;
+ }
+
+ if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() +
1) {
Review Comment:
Sorry, I still am missing something here. I thought we would at to the
current window by adding `bytesInCurrentWindow += bytes`, but if the seconds
are equal I will go to the `else` branch adding a new pair into the Queue. Or
am I getting this wrong?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
+
+import com.google.cloud.Timestamp;
+import com.pholser.junit.quickcheck.From;
+import com.pholser.junit.quickcheck.Property;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TimestampGenerator;
+
+public class ThroughputEstimatorTest {
+ private static final double DELTA = 1e-10;
+
+ @Property
+ public void testThroughputCalculation(@From(TimestampGenerator.class)
Timestamp startTimestamp) {
Review Comment:
I think this specific test does not need to be a property based case, but we
could create other test cases, such as:
1. Generate random pairs of timestamp / bytes within a 60 seconds window and
verify if the bytes are correct
2. Generate random pairs of timestamp / bytes within a 300 seconds window
and verify the sum at boundaries
Here is an example of how I would do #1:
```java
@Property
public void testThroughputIsAccumulatedWithin60SecondsWindow(
List<@InRange(minLong=0L, maxLong = 60000000L) Long> micros,
List<Double> bytes
) {
assumeThat(micros.size(), equalTo(bytes.size()));
micros.sort(Long::compareTo);
final List<Timestamp> timestamps = micros
.stream()
.map(Timestamp::ofTimeMicroseconds)
.collect(Collectors.toList());
final Double expectedSum = bytes.stream().reduce(Double::sum).get();
for (int i = 0; i < timestamps.size(); i++) {
estimator.update(timestamps.get(i), bytes.get(i))
}
double actualSum = estimator.getFrom(Timestamp
.ofTimeSecondsAndNanos(timestamps.get(0).getSeconds() + 1,
timestamps.get(0).getNanos()))
assertEquals(expectedSum, actualSum, DELTA);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]