gemini-code-assist[bot] commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3169893949


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -856,6 +867,11 @@ public void flushState() {
       userTimerInternals.persistTo(outputBuilder);
     }
 
+    @Override
+    public void setBacklogBytes(double backlogBytes) {
+      StreamingModeExecutionContext.this.backlogBytes = (long) backlogBytes;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `backlogBytes` value is cast from `double` to `long` here. In 
`flushState()` (line 532), a value of `1L` is explicitly ignored to avoid 
legacy default values. If a user-reported backlog is between `1.0` and `2.0` 
(e.g., `1.5`), it will be truncated to `1L` and subsequently ignored. While a 
backlog of 1 is typically negligible for autoscaling, this truncation could 
lead to unexpected behavior if the user-reported value is intended to be 
significant.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java:
##########
@@ -626,6 +627,22 @@ public ProcessContinuation processElement(
       return ProcessContinuation.resume();
     }
 
+    @GetSize
+    public double getSize(
+        @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> 
restriction,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      try (UnboundedReader<OutputT> reader =
+          restriction.getSource().createReader(pipelineOptions, 
restriction.getCheckpoint())) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Creating and starting a reader in `getSize` can be expensive for some 
`UnboundedSource` implementations. As an optimization, consider checking if the 
source is an instance of `EmptyUnboundedSource` and returning `0.0` immediately 
to avoid unnecessary reader creation.
   
   ```java
         if (restriction.getSource() instanceof EmptyUnboundedSource) {
           return 0.0;
         }
         try (UnboundedReader<OutputT> reader =
             restriction.getSource().createReader(pipelineOptions, 
restriction.getCheckpoint())) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to