thiagotnunes commented on a change in pull request #17200:
URL: https://github.com/apache/beam/pull/17200#discussion_r837097340
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
##########
@@ -187,6 +193,7 @@ public ProcessContinuation run(
changeStreamRecordMapper.toChangeStreamRecords(
updatedPartition, resultSet.getCurrentRowAsStruct(),
resultSet.getMetadata());
+ double recordsBytes = 0D;
Review comment:
Could we refactor so that the logic of computing the recordBytes is
encapsulated in the throughputEstimator. I was thinking of something in the
lines of:
```
throughputEstimator.update(record);
```
Within this call we would calculate the bytes
(`record.toString().getBytes(StandardCharsets.UTF_8).length;`) and accumulate.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
##########
@@ -146,6 +156,17 @@ public TimestampRange initialRestriction(@Element
PartitionMetadata partition) {
return TimestampRange.of(startTimestamp, endTimestamp);
}
+ @GetSize
+ public double getSize(@Element PartitionMetadata partition, @Restriction
TimestampRange range)
+ throws Exception {
+ final BigDecimal timeGapInSeconds =
+ BigDecimal.valueOf(newTracker(partition,
range).getProgress().getWorkRemaining());
+ final BigDecimal throughput =
BigDecimal.valueOf(this.throughputEstimator.get());
+ LOG.debug(
+ "Reported getSize() - remaining work: " + timeGapInSeconds + "
throughput:" + throughput);
+ return timeGapInSeconds.multiply(throughput).doubleValue();
Review comment:
Any chance of an overflow here? Could you add property based tests for
this?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
##########
@@ -146,6 +156,17 @@ public TimestampRange initialRestriction(@Element
PartitionMetadata partition) {
return TimestampRange.of(startTimestamp, endTimestamp);
}
+ @GetSize
+ public double getSize(@Element PartitionMetadata partition, @Restriction
TimestampRange range)
+ throws Exception {
+ final BigDecimal timeGapInSeconds =
+ BigDecimal.valueOf(newTracker(partition,
range).getProgress().getWorkRemaining());
Review comment:
Instead of creating a new tracker here you can just receive a parameter
of type `RestrictionTracker` (as per
https://github.com/apache/beam/blob/bff4be488e12415cedc2e713f4e5129a18a14497/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L965-L967)
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
##########
@@ -45,12 +49,20 @@
public class TimestampRangeTracker extends RestrictionTracker<TimestampRange,
Timestamp>
implements HasProgress {
+ private static final Logger LOG =
LoggerFactory.getLogger(TimestampRangeTracker.class);
protected TimestampRange range;
protected @Nullable Timestamp lastAttemptedPosition;
protected @Nullable Timestamp lastClaimedPosition;
+ protected TimeFunction timeFunction;
Review comment:
You can use Java Clock instead:
https://docs.oracle.com/javase/8/docs/api/java/time/Clock.html
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** An estimator to provide an estimate on the throughput of the outputted
elements. */
+public class ThroughputEstimator implements Serializable {
+
+ private static class Pair<K, V> {
Review comment:
You can probably use `ImmutablePair` from apache commons?
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TimestampGenerator.java
##########
@@ -34,7 +33,7 @@ public TimestampGenerator() {
@Override
public Timestamp generate(SourceOfRandomness random, GenerationStatus
status) {
- final long seconds = random.nextLong(MIN_SECONDS, MAX_SECONDS);
+ final long seconds = random.nextLong(0L, MAX_SECONDS);
Review comment:
Why did we change this? I think that the generator should always
generate from all the valid values of the domain. If you'd like to restrict it
on specific tests, we can use assumptions
(https://pholser.github.io/junit-quickcheck/site/1.0/usage/constraining.html).
I think we will probably have to extend this generator to accept configuration
as well.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** An estimator to provide an estimate on the throughput of the outputted
elements. */
+public class ThroughputEstimator implements Serializable {
+
+ private static class Pair<K, V> {
+ private final K first;
+ private final V second;
+
+ public Pair(K first, V second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ public K getFirst() {
+ return first;
+ }
+
+ public V getSecond() {
+ return second;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("first: %s, second: %s", first, second);
+ }
+ }
+
+ private static final long serialVersionUID = -3597929310338724800L;
+ // The start time of each per-second window.
+ private Timestamp startTimeOfCurrentWindow;
+ // The bytes of the current window.
+ private double bytesInCurrentWindow;
+ // The number of seconds to look in the past.
+ private final int numOfSeconds = 60;
+ // The total bytes of all windows in the queue.
+ private double bytesInQueue;
+ // The queue holds a number of windows in the past in order to calculate
+ // a rolling windowing throughput.
+ private Queue<Pair<Timestamp, Double>> queue;
+
+ public ThroughputEstimator() {
+ queue = new ArrayDeque<>();
+ }
+
+ /**
+ * Updates the estimator with the bytes of records.
+ *
+ * @param timeOfRecords the committed timestamp of the records
+ * @param bytes the total bytes of the records
+ */
+ public void update(Timestamp timeOfRecords, double bytes) {
+ if (startTimeOfCurrentWindow == null) {
+ bytesInCurrentWindow = bytes;
+ startTimeOfCurrentWindow = timeOfRecords;
+ return;
+ }
+
+ if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() +
1) {
Review comment:
What if the timeOfRecords == startTimeOfCurrentWindow?
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TimestampGenerator.java
##########
@@ -34,7 +33,7 @@ public TimestampGenerator() {
@Override
public Timestamp generate(SourceOfRandomness random, GenerationStatus
status) {
- final long seconds = random.nextLong(MIN_SECONDS, MAX_SECONDS);
+ final long seconds = random.nextLong(0L, MAX_SECONDS);
Review comment:
Nevertheless, IMHO we should keep testing with MIN to MAX seconds
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import org.junit.Test;
+
+public class ThroughputEstimatorTest {
Review comment:
Could we do property base testing here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]