gemini-code-assist[bot] commented on PR #35523:
URL: https://github.com/apache/beam/pull/35523#issuecomment-3163387048

   ## Summary of Changes
   
   
   
   This pull request significantly enhances the robustness of Dataflow 
streaming by introducing support for dynamic physical stream transitions within 
a logical stream. This change allows the system to gracefully manage underlying 
GRPC connections, proactively cycle streams, and automatically recover from 
transient failures by re-flushing pending requests onto new connections, 
thereby improving the overall reliability and stability of long-running 
streaming jobs.
   
   ### Highlights
   
   * **Physical Stream Handover**: Implemented a core mechanism within 
`AbstractWindmillStream` to gracefully transition between physical GRPC streams 
(e.g., half-closing an old connection and establishing a new one) within the 
same logical stream. This allows for proactive stream cycling and improved 
resilience for long-lived streaming connections.
   * **Automatic Request Re-flushing**: Enhanced the stream handling to 
automatically re-send any pending requests from a half-closed or failed 
physical stream onto the newly established physical stream, ensuring that no 
data is lost during transitions or transient connection issues.
   * **Configurable Stream Management**: Introduced new parameters in 
`AbstractWindmillStream` and `GrpcWindmillStreamFactory` to configure the 
duration after which a physical stream should be half-closed 
(`halfClosePhysicalStreamAfter`) and to provide a `ScheduledExecutorService` 
for managing these transitions, offering greater control over stream lifecycle.
   * **Comprehensive Test Coverage**: Added extensive new unit tests across 
`GrpcCommitWorkStreamTest` and `GrpcGetDataStreamTest` to validate the complex 
scenarios involving multiple physical streams, handovers, stream failures (both 
old and new streams), and proper shutdown/half-close behavior. A custom 
`TriggeredScheduledExecutorService` was introduced to enable deterministic 
testing of asynchronous stream events.
   
   <details>
   <summary><b>Changelog</b></summary>
   
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java**
       * Changed `executor` field type from `ExecutorService` to 
`ScheduledExecutorService` to support scheduled tasks.
       * Added new fields: `halfClosePhysicalStreamAfter` (Duration) for 
configuring stream handover timing, `halfCloseFuture` (Future) to manage 
scheduled half-close tasks, and `closingPhysicalStreams` (Set) to track 
physical streams that are in the process of closing.
       * Modified the constructor to accept `halfClosePhysicalStreamAfter` and 
`ScheduledExecutorService` parameters.
       * Refactored `onNewStream()` into `onFlushPending(boolean isNewStream)` 
to handle both initial stream setup and re-flushing of pending requests onto 
new streams.
       * Updated `startStream()` logic to schedule 
`onHalfClosePhysicalStreamTimeout` when `halfClosePhysicalStreamAfter` is 
configured, enabling proactive stream cycling.
       * Modified `halfClose()` and `shutdown()` methods to correctly manage 
the state of multiple physical streams and ensure all pending work is handled 
before full termination.
       * Removed the `maybeTearDownStream` helper method, integrating its logic 
into `onPhysicalStreamCompletion`.
       * Introduced `completeShutdown()` to centralize the final steps of 
stream termination.
       * Added `onHalfClosePhysicalStreamTimeout` to handle the scheduled 
half-closing of physical streams.
       * Updated `onPhysicalStreamCompletion` to manage 
`closingPhysicalStreams` and trigger `onFlushPending` for active streams.
       * Added `clearCurrentPhysicalStream` helper method for consistent state 
cleanup.
       * Overrode `equals` and `hashCode` methods in `PhysicalStreamHandler` to 
use identity-based comparisons for stream instances.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java**
       * Removed the default `setBudget(long newItems, long newBytes)` method 
from the `GetWorkStream` interface, simplifying the interface.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java**
       * Updated the constructor to pass the new `halfClosePhysicalStreamAfter` 
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream` 
superclass.
       * Modified `onFlushPending` to prevent resending requests that are 
already active on physical streams that are in the process of being half-closed.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java**
       * Updated the constructor to pass the new `halfClosePhysicalStreamAfter` 
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream` 
superclass.
       * Modified `onFlushPending` to only send initial requests when a new 
stream is established.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java**
       * Updated the constructor to pass the new `halfClosePhysicalStreamAfter` 
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream` 
superclass.
       * Modified `onFlushPending` to only send initial requests when a new 
stream is established.
       * Enhanced `shutdownInternal` to ensure pending requests on all 
`closingPhysicalStreams` are properly cancelled.
       * Updated a comment in `trySendBatch` to reflect the new 
`onFlushPending` behavior for handling finalized batches.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java**
       * Updated the constructor to pass the new `halfClosePhysicalStreamAfter` 
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream` 
superclass.
       * Modified `onFlushPending` to only send initial requests when a new 
stream is established.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java**
       * Updated the constructor to pass the new `halfClosePhysicalStreamAfter` 
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream` 
superclass.
       * Modified `onFlushPending` to only send initial requests when a new 
stream is established.
   * 
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java**
       * Introduced 
`DEFAULT_DIRECT_STREAMING_RPC_PHYSICAL_STREAM_HALF_CLOSE_AFTER` constant for 
default handover duration.
       * Added a memoized `DIRECT_STREAM_SCHEDULED_EXECUTOR_SERVICE` (a shared 
thread pool) for direct streaming RPCs.
       * Added `directStreamingRpcPhysicalStreamHalfCloseAfter` and `executor` 
fields to the factory to manage new stream parameters.
       * Updated the factory constructor and `create` method to accept and 
propagate the new `halfClosePhysicalStreamAfter` and `ScheduledExecutorService` 
parameters.
       * Added `setDirectStreamingRpcPhysicalStreamHalfCloseAfter` and 
`setScheduledExecutorService` methods to the `Builder` for configuration.
       * Modified all `create*Stream` methods to pass the new `Duration` and 
`Executor` parameters to the respective stream constructors.
       * Introduced `executorForDispatchedStreams` and 
`executorForDirectStreams` helper methods to provide appropriate 
`ScheduledExecutorService` instances based on stream type.
       * Ensured direct stream stubs (`createDirectGetWorkStream`, 
`createDirectGetDataStream`, `createDirectCommitWorkStream`) use 
`withDefaultDeadline`.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java**
       * Reduced sleep/wait times in `awaitTermination` and 
`waitForDroppedCommits` for faster test execution and increased sensitivity to 
timing issues.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java**
       * Reduced sleep/wait times in `testKeyCommitTooLargeException` and 
`testExceptions` for faster test execution.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java**
       * Updated `TestStream` constructor to accept `Duration` and 
`ScheduledExecutorService` to align with `AbstractWindmillStream` changes.
       * Modified `onNewStream()` to `onFlushPending(boolean isNewStream)` and 
updated its internal logic to reflect the new behavior.
       * Integrated `TriggeredScheduledExecutorService` into `TestStream` for 
controlled testing of scheduled events.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/TriggeredScheduledExecutorService.java**
       * **New file:** Implements a custom `ScheduledExecutorService` designed 
for testing asynchronous operations. It allows test code to manually trigger 
scheduled tasks using `unblockNextFuture()`, providing deterministic control 
over timing-dependent logic.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/FakeWindmillGrpcService.java**
       * Added `failConnectionsAndWait` method to simulate transient connection 
failures for testing purposes.
       * Introduced `noMoreStreamsExpected` and `failedStreamConnectsRemaining` 
fields to control and track expected stream connections during tests.
       * Enhanced `StreamInfoObserver`'s `onNext` method to throw an 
`IllegalStateException` if requests are sent on a stream that has already been 
half-closed or fully closed, improving test robustness.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java**
       * Added extensive new test cases 
(`testCommitWorkItem_multiplePhysicalStreams`, `_OldStreamFails`, 
`_newStreamFailsWhileEmpty`, `_newStreamFailsWithRequests`, 
`_multipleHandovers`, `_shutdown`, `_halfClose`) to thoroughly validate the new 
physical stream transition and request re-flushing logic under various 
conditions, including failures and graceful shutdowns.
   * 
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java**
       * Added extensive new test cases 
(`testRequestKeyedData_multiplePhysicalStreams`, `_OldStreamFails`, 
`_newStreamFailsWhileEmpty`, `_newStreamFailsWithRequests`, 
`_multipleHandovers_allResponsesReceived`, 
`_OldStreamFailsWhileNewStreamInBackoff`, `_multipleHandovers_shutdown`, 
`_multipleHandovers_halfClose`) to thoroughly validate the new physical stream 
transition and request re-flushing logic under various conditions, including 
failures and graceful shutdowns.
   </details>
   
   <details>
   <summary><b>Activity</b></summary>
   
   * On July 3rd, `scwhittle` (author) opened the pull request and requested 
initial reviews from `arunpandianp` and `m-trieu`, noting that more tests were 
still needed.
   * The `github-actions[bot]` subsequently stopped reviewer notifications.
   * On August 5th, `scwhittle` re-requested a review from `arunpandianp`, 
indicating that `GrpcGetDataStream` tests were complete, but `CommitStream` 
tests were still in progress.
   * `arunpandianp` acknowledged the review request on August 6th.
   * On August 7th, `scwhittle` confirmed that `CommitStream` tests were 
finished but reported new failures in existing tests, which they were 
investigating.
   * During the review process, `arunpandianp` suggested using 
`IdentityHashMap` for stream registry, which `scwhittle` implemented.
   * A discussion took place regarding the placement of `startStream()` calls, 
where `scwhittle` provided a detailed explanation for the chosen approach.
   * `arunpandianp` also suggested extracting repeated code into a 
`clearPhysicalStream()` function, which `scwhittle` completed.
   </details>
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to