gemini-code-assist[bot] commented on PR #35523:
URL: https://github.com/apache/beam/pull/35523#issuecomment-3163387048
## Summary of Changes
This pull request significantly enhances the robustness of Dataflow
streaming by introducing support for dynamic physical stream transitions within
a logical stream. This change allows the system to gracefully manage underlying
GRPC connections, proactively cycle streams, and automatically recover from
transient failures by re-flushing pending requests onto new connections,
thereby improving the overall reliability and stability of long-running
streaming jobs.
### Highlights
* **Physical Stream Handover**: Implemented a core mechanism within
`AbstractWindmillStream` to gracefully transition between physical GRPC streams
(e.g., half-closing an old connection and establishing a new one) within the
same logical stream. This allows for proactive stream cycling and improved
resilience for long-lived streaming connections.
* **Automatic Request Re-flushing**: Enhanced the stream handling to
automatically re-send any pending requests from a half-closed or failed
physical stream onto the newly established physical stream, ensuring that no
data is lost during transitions or transient connection issues.
* **Configurable Stream Management**: Introduced new parameters in
`AbstractWindmillStream` and `GrpcWindmillStreamFactory` to configure the
duration after which a physical stream should be half-closed
(`halfClosePhysicalStreamAfter`) and to provide a `ScheduledExecutorService`
for managing these transitions, offering greater control over stream lifecycle.
* **Comprehensive Test Coverage**: Added extensive new unit tests across
`GrpcCommitWorkStreamTest` and `GrpcGetDataStreamTest` to validate the complex
scenarios involving multiple physical streams, handovers, stream failures (both
old and new streams), and proper shutdown/half-close behavior. A custom
`TriggeredScheduledExecutorService` was introduced to enable deterministic
testing of asynchronous stream events.
<details>
<summary><b>Changelog</b></summary>
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java**
* Changed `executor` field type from `ExecutorService` to
`ScheduledExecutorService` to support scheduled tasks.
* Added new fields: `halfClosePhysicalStreamAfter` (Duration) for
configuring stream handover timing, `halfCloseFuture` (Future) to manage
scheduled half-close tasks, and `closingPhysicalStreams` (Set) to track
physical streams that are in the process of closing.
* Modified the constructor to accept `halfClosePhysicalStreamAfter` and
`ScheduledExecutorService` parameters.
* Refactored `onNewStream()` into `onFlushPending(boolean isNewStream)`
to handle both initial stream setup and re-flushing of pending requests onto
new streams.
* Updated `startStream()` logic to schedule
`onHalfClosePhysicalStreamTimeout` when `halfClosePhysicalStreamAfter` is
configured, enabling proactive stream cycling.
* Modified `halfClose()` and `shutdown()` methods to correctly manage
the state of multiple physical streams and ensure all pending work is handled
before full termination.
* Removed the `maybeTearDownStream` helper method, integrating its logic
into `onPhysicalStreamCompletion`.
* Introduced `completeShutdown()` to centralize the final steps of
stream termination.
* Added `onHalfClosePhysicalStreamTimeout` to handle the scheduled
half-closing of physical streams.
* Updated `onPhysicalStreamCompletion` to manage
`closingPhysicalStreams` and trigger `onFlushPending` for active streams.
* Added `clearCurrentPhysicalStream` helper method for consistent state
cleanup.
* Overrode `equals` and `hashCode` methods in `PhysicalStreamHandler` to
use identity-based comparisons for stream instances.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java**
* Removed the default `setBudget(long newItems, long newBytes)` method
from the `GetWorkStream` interface, simplifying the interface.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java**
* Updated the constructor to pass the new `halfClosePhysicalStreamAfter`
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream`
superclass.
* Modified `onFlushPending` to prevent resending requests that are
already active on physical streams that are in the process of being half-closed.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java**
* Updated the constructor to pass the new `halfClosePhysicalStreamAfter`
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream`
superclass.
* Modified `onFlushPending` to only send initial requests when a new
stream is established.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java**
* Updated the constructor to pass the new `halfClosePhysicalStreamAfter`
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream`
superclass.
* Modified `onFlushPending` to only send initial requests when a new
stream is established.
* Enhanced `shutdownInternal` to ensure pending requests on all
`closingPhysicalStreams` are properly cancelled.
* Updated a comment in `trySendBatch` to reflect the new
`onFlushPending` behavior for handling finalized batches.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java**
* Updated the constructor to pass the new `halfClosePhysicalStreamAfter`
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream`
superclass.
* Modified `onFlushPending` to only send initial requests when a new
stream is established.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java**
* Updated the constructor to pass the new `halfClosePhysicalStreamAfter`
and `ScheduledExecutorService` parameters to the `AbstractWindmillStream`
superclass.
* Modified `onFlushPending` to only send initial requests when a new
stream is established.
*
**runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java**
* Introduced
`DEFAULT_DIRECT_STREAMING_RPC_PHYSICAL_STREAM_HALF_CLOSE_AFTER` constant for
default handover duration.
* Added a memoized `DIRECT_STREAM_SCHEDULED_EXECUTOR_SERVICE` (a shared
thread pool) for direct streaming RPCs.
* Added `directStreamingRpcPhysicalStreamHalfCloseAfter` and `executor`
fields to the factory to manage new stream parameters.
* Updated the factory constructor and `create` method to accept and
propagate the new `halfClosePhysicalStreamAfter` and `ScheduledExecutorService`
parameters.
* Added `setDirectStreamingRpcPhysicalStreamHalfCloseAfter` and
`setScheduledExecutorService` methods to the `Builder` for configuration.
* Modified all `create*Stream` methods to pass the new `Duration` and
`Executor` parameters to the respective stream constructors.
* Introduced `executorForDispatchedStreams` and
`executorForDirectStreams` helper methods to provide appropriate
`ScheduledExecutorService` instances based on stream type.
* Ensured direct stream stubs (`createDirectGetWorkStream`,
`createDirectGetDataStream`, `createDirectCommitWorkStream`) use
`withDefaultDeadline`.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java**
* Reduced sleep/wait times in `awaitTermination` and
`waitForDroppedCommits` for faster test execution and increased sensitivity to
timing issues.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java**
* Reduced sleep/wait times in `testKeyCommitTooLargeException` and
`testExceptions` for faster test execution.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java**
* Updated `TestStream` constructor to accept `Duration` and
`ScheduledExecutorService` to align with `AbstractWindmillStream` changes.
* Modified `onNewStream()` to `onFlushPending(boolean isNewStream)` and
updated its internal logic to reflect the new behavior.
* Integrated `TriggeredScheduledExecutorService` into `TestStream` for
controlled testing of scheduled events.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/TriggeredScheduledExecutorService.java**
* **New file:** Implements a custom `ScheduledExecutorService` designed
for testing asynchronous operations. It allows test code to manually trigger
scheduled tasks using `unblockNextFuture()`, providing deterministic control
over timing-dependent logic.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/FakeWindmillGrpcService.java**
* Added `failConnectionsAndWait` method to simulate transient connection
failures for testing purposes.
* Introduced `noMoreStreamsExpected` and `failedStreamConnectsRemaining`
fields to control and track expected stream connections during tests.
* Enhanced `StreamInfoObserver`'s `onNext` method to throw an
`IllegalStateException` if requests are sent on a stream that has already been
half-closed or fully closed, improving test robustness.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java**
* Added extensive new test cases
(`testCommitWorkItem_multiplePhysicalStreams`, `_OldStreamFails`,
`_newStreamFailsWhileEmpty`, `_newStreamFailsWithRequests`,
`_multipleHandovers`, `_shutdown`, `_halfClose`) to thoroughly validate the new
physical stream transition and request re-flushing logic under various
conditions, including failures and graceful shutdowns.
*
**runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java**
* Added extensive new test cases
(`testRequestKeyedData_multiplePhysicalStreams`, `_OldStreamFails`,
`_newStreamFailsWhileEmpty`, `_newStreamFailsWithRequests`,
`_multipleHandovers_allResponsesReceived`,
`_OldStreamFailsWhileNewStreamInBackoff`, `_multipleHandovers_shutdown`,
`_multipleHandovers_halfClose`) to thoroughly validate the new physical stream
transition and request re-flushing logic under various conditions, including
failures and graceful shutdowns.
</details>
<details>
<summary><b>Activity</b></summary>
* On July 3rd, `scwhittle` (author) opened the pull request and requested
initial reviews from `arunpandianp` and `m-trieu`, noting that more tests were
still needed.
* The `github-actions[bot]` subsequently stopped reviewer notifications.
* On August 5th, `scwhittle` re-requested a review from `arunpandianp`,
indicating that `GrpcGetDataStream` tests were complete, but `CommitStream`
tests were still in progress.
* `arunpandianp` acknowledged the review request on August 6th.
* On August 7th, `scwhittle` confirmed that `CommitStream` tests were
finished but reported new failures in existing tests, which they were
investigating.
* During the review process, `arunpandianp` suggested using
`IdentityHashMap` for stream registry, which `scwhittle` implemented.
* A discussion took place regarding the placement of `startStream()` calls,
where `scwhittle` provided a detailed explanation for the chosen approach.
* `arunpandianp` also suggested extracting repeated code into a
`clearPhysicalStream()` function, which `scwhittle` completed.
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]