gemini-code-assist[bot] commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3169893949
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -856,6 +867,11 @@ public void flushState() {
userTimerInternals.persistTo(outputBuilder);
}
+ @Override
+ public void setBacklogBytes(double backlogBytes) {
+ StreamingModeExecutionContext.this.backlogBytes = (long) backlogBytes;
Review Comment:

The `backlogBytes` value is cast from `double` to `long` here. In
`flushState()` (line 532), a value of `1L` is explicitly ignored to avoid
legacy default values. If a user-reported backlog is between `1.0` and `2.0`
(e.g., `1.5`), it will be truncated to `1L` and subsequently ignored. While a
backlog of 1 is typically negligible for autoscaling, this truncation could
lead to unexpected behavior if the user-reported value is intended to be
significant.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java:
##########
@@ -626,6 +627,22 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume();
}
+ @GetSize
+ public double getSize(
+ @Restriction UnboundedSourceRestriction<OutputT, CheckpointT>
restriction,
+ PipelineOptions pipelineOptions)
+ throws Exception {
+ try (UnboundedReader<OutputT> reader =
+ restriction.getSource().createReader(pipelineOptions,
restriction.getCheckpoint())) {
Review Comment:

Creating and starting a reader in `getSize` can be expensive for some
`UnboundedSource` implementations. As an optimization, consider checking if the
source is an instance of `EmptyUnboundedSource` and returning `0.0` immediately
to avoid unnecessary reader creation.
```java
if (restriction.getSource() instanceof EmptyUnboundedSource) {
return 0.0;
}
try (UnboundedReader<OutputT> reader =
restriction.getSource().createReader(pipelineOptions,
restriction.getCheckpoint())) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]