This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ba714221d5e organize and refactor GrpcWindmillServer. (#29156)
ba714221d5e is described below
commit ba714221d5efaea58a59bccaad2eaeef70bd4ec4
Author: martin trieu <[email protected]>
AuthorDate: Tue Oct 31 02:37:10 2023 -0700
organize and refactor GrpcWindmillServer. (#29156)
organize and refactor GrpcWindmillServer to prepare for Streaming Engine
Client changes.
---
.../google-cloud-dataflow-java/worker/build.gradle | 3 +
.../worker/MetricTrackingWindmillServerStub.java | 4 +-
.../dataflow/worker/StreamingDataflowWorker.java | 6 +-
.../options/StreamingDataflowWorkerOptions.java | 2 +-
.../worker/windmill/WindmillServerBase.java | 8 +-
.../worker/windmill/WindmillServerStub.java | 8 +-
.../{ => client}/AbstractWindmillStream.java | 3 +-
.../windmill/{ => client}/WindmillStream.java | 21 +-
.../windmill/{ => client}/WindmillStreamPool.java | 2 +-
.../grpc}/AppendableInputStream.java | 2 +-
.../grpc}/GetWorkTimingInfosTracker.java | 2 +-
.../grpc}/GrpcCommitWorkStream.java | 9 +-
.../grpc}/GrpcDeadlineClientInterceptor.java | 2 +-
.../windmill/client/grpc/GrpcDispatcherClient.java | 136 +++++
.../grpc}/GrpcGetDataStream.java | 13 +-
.../grpc}/GrpcGetDataStreamRequests.java | 2 +-
.../grpc}/GrpcGetWorkStream.java | 29 +-
.../grpc}/GrpcGetWorkerMetadataStream.java | 9 +-
.../windmill/client/grpc/GrpcWindmillServer.java | 355 ++++++++++++
.../client/grpc/GrpcWindmillStreamFactory.java | 227 ++++++++
.../grpc/auth/VendoredCredentialsAdapter.java | 81 +++
.../VendoredRequestMetadataCallbackAdapter.java | 51 ++
.../grpc/observers}/DirectStreamObserver.java | 2 +-
.../ForwardingClientResponseObserver.java | 2 +-
.../grpc/observers}/StreamObserverFactory.java | 2 +-
.../client/grpc/stubs/WindmillChannelFactory.java | 137 +++++
.../client/grpc/stubs/WindmillStubFactory.java | 73 +++
.../throttling/StreamingEngineThrottleTimers.java | 41 ++
.../throttling}/ThrottleTimer.java | 6 +-
.../windmill/grpcclient/GrpcWindmillServer.java | 607 ---------------------
.../worker/windmill/work/WorkItemReceiver.java | 34 ++
.../worker/windmill/work/budget/GetWorkBudget.java | 98 ++++
.../dataflow/worker/FakeWindmillServer.java | 24 +-
.../{ => client}/WindmillStreamPoolTest.java | 2 +-
.../grpc}/GrpcGetWorkerMetadataStreamTest.java | 9 +-
.../grpc}/GrpcWindmillServerTest.java | 15 +-
.../windmill/work/budget/GetWorkBudgetTest.java | 72 +++
37 files changed, 1413 insertions(+), 686 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle
b/runners/google-cloud-dataflow-java/worker/build.gradle
index ce06063c9b5..1ca9eba2b48 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -89,6 +89,9 @@ applyJavaNature(
// Allow slf4j implementation worker for logging during
pipeline execution
"org/slf4j/impl/**"
],
+ generatedClassPatterns: [
+ /^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/
+ ],
shadowClosure: {
// Each included dependency must also include all of its necessary
transitive dependencies
// or have them provided by the users pipeline during job
submission. Typically a users
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index 33b55647213..0e929249b3a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -29,8 +29,8 @@ import
org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 811250ee785..11849e8b8c4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -104,9 +104,9 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.sdk.coders.Coder;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
index cc5b3302b01..bacfa1eef63 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
-import
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
index fe81eece138..8caa79cd3f7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.dataflow.worker.windmill;
import java.io.IOException;
import java.util.Set;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
/**
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index 1bb5359e06f..c327e68d7e9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -21,10 +21,10 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
/** Stub for communicating with a Windmill server. */
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index ea7efff7a06..4e47676989a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
import java.io.IOException;
import java.io.PrintWriter;
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
similarity index 84%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
index 4dd4164fc4e..fa1f797a191 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.joda.time.Instant;
/** Superclass for streams returned by streaming Windmill methods. */
@@ -41,16 +41,11 @@ public interface WindmillStream {
/** Handle representing a stream of GetWork responses. */
@ThreadSafe
interface GetWorkStream extends WindmillStream {
- /** Functional interface for receiving WorkItems. */
- @FunctionalInterface
- interface WorkItemReceiver {
- void receiveWork(
- String computation,
- @Nullable Instant inputDataWatermark,
- @Nullable Instant synchronizedProcessingTime,
- Windmill.WorkItem workItem,
- Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);
- }
+ /** Adjusts the {@link GetWorkBudget} for the stream. */
+ void adjustBudget(long itemsDelta, long bytesDelta);
+
+ /** Returns the remaining in-flight {@link GetWorkBudget}. */
+ GetWorkBudget remainingBudget();
}
/** Interface for streaming GetDataRequests to Windmill. */
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
similarity index 99%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
index 9cd4ab0ea4a..9f1b67edc1e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
import java.util.ArrayList;
import java.util.HashMap;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
index dbd3613ee4c..6a0d0a63d5a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import java.io.IOException;
import java.io.InputStream;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
similarity index 99%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
index e6710993af9..221b18be164 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import java.util.ArrayList;
import java.util.Collection;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
similarity index 96%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
index 1bba40805de..5d0a5085fe1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -27,15 +27,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
similarity index 97%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
index 6b0e19cbb48..629006e2359 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import java.util.concurrent.TimeUnit;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
new file mode 100644
index 00000000000..ef9156f9c05
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
+@ThreadSafe
+class GrpcDispatcherClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
+ private final WindmillStubFactory windmillStubFactory;
+
+ @GuardedBy("this")
+ private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
+
+ @GuardedBy("this")
+ private final Set<HostAndPort> dispatcherEndpoints;
+
+ @GuardedBy("this")
+ private final Random rand;
+
+ private GrpcDispatcherClient(
+ WindmillStubFactory windmillStubFactory,
+ List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+ Set<HostAndPort> dispatcherEndpoints,
+ Random rand) {
+ this.windmillStubFactory = windmillStubFactory;
+ this.dispatcherStubs = dispatcherStubs;
+ this.dispatcherEndpoints = dispatcherEndpoints;
+ this.rand = rand;
+ }
+
+ static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
+ return new GrpcDispatcherClient(
+ windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+ }
+
+ @VisibleForTesting
+ static GrpcDispatcherClient forTesting(
+ WindmillStubFactory windmillGrpcStubFactory,
+ List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+ Set<HostAndPort> dispatcherEndpoints) {
+ Preconditions.checkArgument(dispatcherEndpoints.size() ==
dispatcherStubs.size());
+ return new GrpcDispatcherClient(
+ windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new
Random());
+ }
+
+ synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+ Preconditions.checkState(
+ !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been
set");
+
+ return (dispatcherStubs.size() == 1
+ ? dispatcherStubs.get(0)
+ : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+ }
+
+ synchronized boolean isReady() {
+ return !dispatcherStubs.isEmpty();
+ }
+
+ synchronized void consumeWindmillDispatcherEndpoints(
+ ImmutableSet<HostAndPort> dispatcherEndpoints) {
+ Preconditions.checkArgument(
+ dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
+ "Cannot set dispatcher endpoints to nothing.");
+ if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+ // The endpoints are equal don't recreate the stubs.
+ return;
+ }
+
+ LOG.info("Creating a new windmill stub, endpoints: {}",
dispatcherEndpoints);
+ if (!this.dispatcherEndpoints.isEmpty()) {
+ LOG.info("Previous windmill stub endpoints: {}",
this.dispatcherEndpoints);
+ }
+
+ resetDispatcherEndpoints(dispatcherEndpoints);
+ }
+
+ private synchronized void resetDispatcherEndpoints(
+ ImmutableSet<HostAndPort> newDispatcherEndpoints) {
+ LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}",
newDispatcherEndpoints);
+ this.dispatcherStubs.clear();
+ this.dispatcherEndpoints.clear();
+ this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+
+ dispatcherEndpoints.stream()
+ .map(this::createDispatcherStubForWindmillService)
+ .forEach(dispatcherStubs::add);
+ }
+
+ private CloudWindmillServiceV1Alpha1Stub
createDispatcherStubForWindmillService(
+ HostAndPort endpoint) {
+ if (LOCALHOST.equals(endpoint.getHost())) {
+ return
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+ }
+
+ // Use an in-process stub if testing.
+ return windmillStubFactory.getKind() == WindmillStubFactory.Kind.IN_PROCESS
+ ? windmillStubFactory.inProcess().get()
+ :
windmillStubFactory.remote().apply(WindmillServiceAddress.create(endpoint));
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
similarity index 95%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
index 238cc771dce..ea9cd7f0fa3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;
@@ -33,8 +33,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
@@ -43,9 +41,12 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataReq
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedBatch;
-import
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.joda.time.Instant;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
index 7da7b13958b..cda9537127d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import com.google.auto.value.AutoOneOf;
import java.util.ArrayList;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
similarity index 89%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
index 4660fe25b13..d7d9bfddffb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import java.io.IOException;
import java.io.PrintWriter;
@@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequestExtension;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
@@ -44,7 +46,7 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class GrpcGetWorkStream
+public final class GrpcGetWorkStream
extends AbstractWindmillStream<StreamingGetWorkRequest,
StreamingGetWorkResponseChunk>
implements GetWorkStream {
@@ -79,7 +81,7 @@ final class GrpcGetWorkStream
this.inflightBytes = new AtomicLong();
}
- static GrpcGetWorkStream create(
+ public static GrpcGetWorkStream create(
Function<
StreamObserver<StreamingGetWorkResponseChunk>,
StreamObserver<StreamingGetWorkRequest>>
@@ -190,6 +192,19 @@ final class GrpcGetWorkStream
getWorkThrottleTimer.start();
}
+ @Override
+ public void adjustBudget(long itemsDelta, long bytesDelta) {
+ // no-op
+ }
+
+ @Override
+ public GetWorkBudget remainingBudget() {
+ return GetWorkBudget.builder()
+ .setBytes(request.getMaxBytes() - inflightBytes.get())
+ .setItems(request.getMaxItems() - inflightMessages.get())
+ .build();
+ }
+
private class WorkItemBuffer {
private final GetWorkTimingInfosTracker workTimingInfosTracker;
private String computation;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
similarity index 93%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
index 427fd412ec7..a403feddb45 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.PrintWriter;
@@ -23,13 +23,14 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
new file mode 100644
index 00000000000..3a881df7146
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** gRPC client for communicating with Streaming Engine. */
+@SuppressFBWarnings({
+ // Very likely real potentials for bugs.
+ "JLM_JSR166_UTILCONCURRENT_MONITORENTER", //
https://github.com/apache/beam/issues/19273
+ "IS2_INCONSISTENT_SYNC" // https://github.com/apache/beam/issues/19271
+})
+@SuppressWarnings("nullness") //
TODO(https://github.com/apache/beam/issues/20497
+public final class GrpcWindmillServer extends WindmillServerStub {
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcWindmillServer.class);
+ private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
+ private static final Duration MIN_BACKOFF = Duration.millis(1);
+ private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
+ private static final int NO_HEALTH_CHECK = -1;
+ private static final String GRPC_LOCALHOST = "grpc:localhost";
+
+ private final GrpcWindmillStreamFactory windmillStreamFactory;
+ private final GrpcDispatcherClient dispatcherClient;
+ private final StreamingDataflowWorkerOptions options;
+ private final StreamingEngineThrottleTimers throttleTimers;
+ private Duration maxBackoff;
+ private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub
syncApplianceStub;
+
+ private GrpcWindmillServer(
+ StreamingDataflowWorkerOptions options, GrpcDispatcherClient
grpcDispatcherClient) {
+ this.options = options;
+ this.throttleTimers = StreamingEngineThrottleTimers.create();
+ this.maxBackoff = MAX_BACKOFF;
+ this.windmillStreamFactory =
+ GrpcWindmillStreamFactory.of(
+ JobHeader.newBuilder()
+ .setJobId(options.getJobId())
+ .setProjectId(options.getProject())
+ .setWorkerId(options.getWorkerId())
+ .build())
+ .setWindmillMessagesBetweenIsReadyChecks(
+ options.getWindmillMessagesBetweenIsReadyChecks())
+ .setMaxBackOffSupplier(() -> maxBackoff)
+ .setLogEveryNStreamFailures(
+ options.getWindmillServiceStreamingLogEveryNStreamFailures())
+
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
+ .build();
+ windmillStreamFactory.scheduleHealthChecks(
+ options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
+
+ this.dispatcherClient = grpcDispatcherClient;
+ this.syncApplianceStub = null;
+ }
+
+ private static StreamingDataflowWorkerOptions testOptions(boolean
enableStreamingEngine) {
+ StreamingDataflowWorkerOptions options =
+
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+ options.setProject("project");
+ options.setJobId("job");
+ options.setWorkerId("worker");
+ List<String> experiments =
+ options.getExperiments() == null ? new ArrayList<>() :
options.getExperiments();
+ if (enableStreamingEngine) {
+ experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
+ }
+ options.setExperiments(experiments);
+
+ options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE);
+ options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK);
+
options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES);
+
+ return options;
+ }
+
+ /** Create new instance of {@link GrpcWindmillServer}. */
+ public static GrpcWindmillServer create(StreamingDataflowWorkerOptions
workerOptions)
+ throws IOException {
+
+ GrpcWindmillServer grpcWindmillServer =
+ new GrpcWindmillServer(
+ workerOptions,
+ GrpcDispatcherClient.create(
+ WindmillStubFactory.remoteStubFactory(
+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+ workerOptions.getGcpCredential())));
+ if (workerOptions.getWindmillServiceEndpoint() != null) {
+ grpcWindmillServer.configureWindmillServiceEndpoints();
+ } else if (!workerOptions.isEnableStreamingEngine()
+ && workerOptions.getLocalWindmillHostport() != null) {
+ grpcWindmillServer.configureLocalHost();
+ }
+
+ return grpcWindmillServer;
+ }
+
+ @VisibleForTesting
+ static GrpcWindmillServer newTestInstance(String name) {
+ ManagedChannel inProcessChannel = inProcessChannel(name);
+ CloudWindmillServiceV1Alpha1Stub stub =
+ CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel);
+ List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs =
Lists.newArrayList(stub);
+ Set<HostAndPort> dispatcherEndpoints =
Sets.newHashSet(HostAndPort.fromHost(name));
+ GrpcDispatcherClient dispatcherClient =
+ GrpcDispatcherClient.forTesting(
+ WindmillStubFactory.inProcessStubFactory(name, unused ->
inProcessChannel),
+ dispatcherStubs,
+ dispatcherEndpoints);
+ return new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */
true), dispatcherClient);
+ }
+
+ @VisibleForTesting
+ static GrpcWindmillServer newApplianceTestInstance(Channel channel) {
+ GrpcWindmillServer testServer =
+ new GrpcWindmillServer(
+ testOptions(/* enableStreamingEngine= */ false),
+ // No-op, Appliance does not use Dispatcher to call Streaming
Engine.
+
GrpcDispatcherClient.create(WindmillStubFactory.inProcessStubFactory("test")));
+ testServer.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
+ return testServer;
+ }
+
+ private static WindmillApplianceGrpc.WindmillApplianceBlockingStub
+ createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) {
+ return WindmillApplianceGrpc.newBlockingStub(channel)
+
.withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline());
+ }
+
+ private static UnsupportedOperationException
unsupportedUnaryRequestInStreamingEngineException(
+ String rpcName) {
+ return new UnsupportedOperationException(
+ String.format("Unary %s calls are not supported in Streaming Engine.",
rpcName));
+ }
+
+ private void configureWindmillServiceEndpoints() {
+ Set<HostAndPort> endpoints = new HashSet<>();
+ for (String endpoint :
Splitter.on(',').split(options.getWindmillServiceEndpoint())) {
+ endpoints.add(
+
HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort()));
+ }
+
+
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+ }
+
+ private void configureLocalHost() {
+ int portStart = options.getLocalWindmillHostport().lastIndexOf(':');
+ String endpoint = options.getLocalWindmillHostport().substring(0,
portStart);
+ Preconditions.checkState(GRPC_LOCALHOST.equals(endpoint));
+ int port =
Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1));
+ dispatcherClient.consumeWindmillDispatcherEndpoints(
+ ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port)));
+ initializeLocalHost(port);
+ }
+
+ @Override
+ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+ }
+
+ @Override
+ public boolean isReady() {
+ return dispatcherClient.isReady();
+ }
+
+ private synchronized void initializeLocalHost(int port) {
+ this.maxBackoff = Duration.millis(500);
+ if (options.isEnableStreamingEngine()) {
+ dispatcherClient.consumeWindmillDispatcherEndpoints(
+ ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port)));
+ } else {
+ this.syncApplianceStub =
+
createWindmillApplianceStubWithDeadlineInterceptor(localhostChannel(port));
+ }
+ }
+
+ @Override
+ public void appendSummaryHtml(PrintWriter writer) {
+ windmillStreamFactory.appendSummaryHtml(writer);
+ }
+
+ private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
+ // Configure backoff to retry calls forever, with a maximum sane retry
interval.
+ BackOff backoff =
+
FluentBackoff.DEFAULT.withInitialBackoff(MIN_BACKOFF).withMaxBackoff(maxBackoff).backoff();
+
+ int rpcErrors = 0;
+ while (true) {
+ try {
+ return function.get();
+ } catch (StatusRuntimeException e) {
+ try {
+ if (++rpcErrors % 20 == 0) {
+ LOG.warn(
+ "Many exceptions calling gRPC. Last exception: {} with status
{}",
+ e,
+ e.getStatus());
+ }
+ if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+ throw new RpcException(e);
+ }
+ } catch (IOException | InterruptedException i) {
+ if (i instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ RpcException rpcException = new RpcException(e);
+ rpcException.addSuppressed(i);
+ throw rpcException;
+ }
+ }
+ }
+ }
+
+ @Override
+ public GetWorkResponse getWork(GetWorkRequest request) {
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.getWork(request));
+ }
+
+ throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
+ }
+
+ @Override
+ public GetDataResponse getData(GetDataRequest request) {
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.getData(request));
+ }
+
+ throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
+ }
+
+ @Override
+ public CommitWorkResponse commitWork(CommitWorkRequest request) {
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.commitWork(request));
+ }
+ throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
+ }
+
+ @Override
+ public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver
receiver) {
+ return windmillStreamFactory.createGetWorkStream(
+ dispatcherClient.getDispatcherStub(),
+ GetWorkRequest.newBuilder(request)
+ .setJobId(options.getJobId())
+ .setProjectId(options.getProject())
+ .setWorkerId(options.getWorkerId())
+ .build(),
+ throttleTimers.getWorkThrottleTimer(),
+ receiver);
+ }
+
+ @Override
+ public GetDataStream getDataStream() {
+ return windmillStreamFactory.createGetDataStream(
+ dispatcherClient.getDispatcherStub(),
throttleTimers.getDataThrottleTimer());
+ }
+
+ @Override
+ public CommitWorkStream commitWorkStream() {
+ return windmillStreamFactory.createCommitWorkStream(
+ dispatcherClient.getDispatcherStub(),
throttleTimers.commitWorkThrottleTimer());
+ }
+
+ @Override
+ public GetConfigResponse getConfig(GetConfigRequest request) {
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.getConfig(request));
+ }
+
+ throw new RpcException(
+ new UnsupportedOperationException("GetConfig not supported in
Streaming Engine."));
+ }
+
+ @Override
+ public ReportStatsResponse reportStats(ReportStatsRequest request) {
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.reportStats(request));
+ }
+
+ throw new RpcException(
+ new UnsupportedOperationException("ReportStats not supported in
Streaming Engine."));
+ }
+
+ @Override
+ public long getAndResetThrottleTime() {
+ return throttleTimers.getAndResetThrottleTime();
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
new file mode 100644
index 00000000000..e474ebf18b2
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
+
+import com.google.auto.value.AutoBuilder;
+import java.io.PrintWriter;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Creates gRPC streaming connections to Windmill Service. Maintains a set of
all currently opened
+ * RPC streams for health check/heartbeat requests to keep the streams alive.
+ */
+@ThreadSafe
+public final class GrpcWindmillStreamFactory implements StatusDataProvider {
+ private static final Duration MIN_BACKOFF = Duration.millis(1);
+ private static final Duration DEFAULT_MAX_BACKOFF =
Duration.standardSeconds(30);
+ private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1;
+ private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT =
Integer.MAX_VALUE;
+ private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS =
1;
+
+ private final JobHeader jobHeader;
+ private final int logEveryNStreamFailures;
+ private final int streamingRpcBatchLimit;
+ private final int windmillMessagesBetweenIsReadyChecks;
+ private final Supplier<BackOff> grpcBackOff;
+ private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
+ private final AtomicLong streamIdGenerator;
+
+ GrpcWindmillStreamFactory(
+ JobHeader jobHeader,
+ int logEveryNStreamFailures,
+ int streamingRpcBatchLimit,
+ int windmillMessagesBetweenIsReadyChecks,
+ Supplier<Duration> maxBackOffSupplier) {
+ this.jobHeader = jobHeader;
+ this.logEveryNStreamFailures = logEveryNStreamFailures;
+ this.streamingRpcBatchLimit = streamingRpcBatchLimit;
+ this.windmillMessagesBetweenIsReadyChecks =
windmillMessagesBetweenIsReadyChecks;
+ // Configure backoff to retry calls forever, with a maximum sane retry
interval.
+ this.grpcBackOff =
+ Suppliers.memoize(
+ () ->
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(MIN_BACKOFF)
+ .withMaxBackoff(maxBackOffSupplier.get())
+ .backoff());
+ this.streamRegistry = ConcurrentHashMap.newKeySet();
+ this.streamIdGenerator = new AtomicLong();
+ }
+
+ /**
+ * Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with
default values set for
+ * the given {@link JobHeader}.
+ */
+ public static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) {
+ return new AutoBuilder_GrpcWindmillStreamFactory_Builder()
+ .setJobHeader(jobHeader)
+
.setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS)
+ .setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF)
+ .setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES)
+ .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT);
+ }
+
+ private static CloudWindmillServiceV1Alpha1Stub withDeadline(
+ CloudWindmillServiceV1Alpha1Stub stub) {
+ // Deadlines are absolute points in time, so generate a new one everytime
this function is
+ // called.
+ return stub.withDeadlineAfter(
+ AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS,
TimeUnit.SECONDS);
+ }
+
+ public GetWorkStream createGetWorkStream(
+ CloudWindmillServiceV1Alpha1Stub stub,
+ GetWorkRequest request,
+ ThrottleTimer getWorkThrottleTimer,
+ WorkItemReceiver processWorkItem) {
+ return GrpcGetWorkStream.create(
+ responseObserver -> withDeadline(stub).getWorkStream(responseObserver),
+ request,
+ grpcBackOff.get(),
+ newStreamObserverFactory(),
+ streamRegistry,
+ logEveryNStreamFailures,
+ getWorkThrottleTimer,
+ processWorkItem);
+ }
+
+ public GetDataStream createGetDataStream(
+ CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer
getDataThrottleTimer) {
+ return GrpcGetDataStream.create(
+ responseObserver -> withDeadline(stub).getDataStream(responseObserver),
+ grpcBackOff.get(),
+ newStreamObserverFactory(),
+ streamRegistry,
+ logEveryNStreamFailures,
+ getDataThrottleTimer,
+ jobHeader,
+ streamIdGenerator,
+ streamingRpcBatchLimit);
+ }
+
+ public CommitWorkStream createCommitWorkStream(
+ CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer
commitWorkThrottleTimer) {
+ return GrpcCommitWorkStream.create(
+ responseObserver ->
withDeadline(stub).commitWorkStream(responseObserver),
+ grpcBackOff.get(),
+ newStreamObserverFactory(),
+ streamRegistry,
+ logEveryNStreamFailures,
+ commitWorkThrottleTimer,
+ jobHeader,
+ streamIdGenerator,
+ streamingRpcBatchLimit);
+ }
+
+ public GetWorkerMetadataStream createGetWorkerMetadataStream(
+ CloudWindmillServiceV1Alpha1Stub stub,
+ ThrottleTimer getWorkerMetadataThrottleTimer,
+ Consumer<WindmillEndpoints> onNewWindmillEndpoints) {
+ return GrpcGetWorkerMetadataStream.create(
+ responseObserver ->
withDeadline(stub).getWorkerMetadataStream(responseObserver),
+ grpcBackOff.get(),
+ newStreamObserverFactory(),
+ streamRegistry,
+ logEveryNStreamFailures,
+ jobHeader,
+ 0,
+ getWorkerMetadataThrottleTimer,
+ onNewWindmillEndpoints);
+ }
+
+ private StreamObserverFactory newStreamObserverFactory() {
+ return StreamObserverFactory.direct(
+ DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2,
windmillMessagesBetweenIsReadyChecks);
+ }
+
+ /**
+ * Schedules streaming RPC health checks to run on a background daemon
thread, which will be
+ * cleaned up when the JVM shutdown.
+ */
+ public void scheduleHealthChecks(int healthCheckInterval) {
+ if (healthCheckInterval < 0) {
+ return;
+ }
+
+ new Timer("WindmillHealthCheckTimer")
+ .schedule(
+ new TimerTask() {
+ @Override
+ public void run() {
+ Instant reportThreshold =
Instant.now().minus(Duration.millis(healthCheckInterval));
+ for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+ stream.maybeSendHealthCheck(reportThreshold);
+ }
+ }
+ },
+ 0,
+ healthCheckInterval);
+ }
+
+ @Override
+ public void appendSummaryHtml(PrintWriter writer) {
+ writer.write("Active Streams:<br>");
+ for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+ stream.appendSummaryHtml(writer);
+ writer.write("<br>");
+ }
+ }
+
+ @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class)
+ interface Builder {
+ Builder setJobHeader(JobHeader jobHeader);
+
+ Builder setLogEveryNStreamFailures(int logEveryNStreamFailures);
+
+ Builder setStreamingRpcBatchLimit(int streamingRpcBatchLimit);
+
+ Builder setWindmillMessagesBetweenIsReadyChecks(int
windmillMessagesBetweenIsReadyChecks);
+
+ Builder setMaxBackOffSupplier(Supplier<Duration> maxBackOff);
+
+ GrpcWindmillStreamFactory build();
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
new file mode 100644
index 00000000000..23f6fb801a4
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a wrapper around credentials that delegates to the underlying {@link
+ * com.google.auth.Credentials}. Note that this class should override every
method that is not final
+ * and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the
underlying implementation
+ * delegate to reduce maintenance burden.
+ */
+public class VendoredCredentialsAdapter
+ extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials {
+
+ private final com.google.auth.Credentials credentials;
+
+ public VendoredCredentialsAdapter(com.google.auth.Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ @Override
+ public String getAuthenticationType() {
+ return credentials.getAuthenticationType();
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata() throws IOException {
+ return credentials.getRequestMetadata();
+ }
+
+ @Override
+ public void getRequestMetadata(
+ final URI uri,
+ Executor executor,
+ final
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
callback) {
+ credentials.getRequestMetadata(
+ uri, executor, new VendoredRequestMetadataCallbackAdapter(callback));
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata(URI uri) throws
IOException {
+ return credentials.getRequestMetadata(uri);
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return credentials.hasRequestMetadata();
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return credentials.hasRequestMetadataOnly();
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ credentials.refresh();
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
new file mode 100644
index 00000000000..8b1b695287e
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Create a wrapper around credentials callback that delegates to the
underlying vendored {@link
+ * com.google.auth.RequestMetadataCallback}. Note that this class should
override every method that
+ * is not final and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the
underlying implementation
+ * delegate to reduce maintenance burden.
+ */
+public class VendoredRequestMetadataCallbackAdapter
+ implements com.google.auth.RequestMetadataCallback {
+
+ private final
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
+ callback;
+
+ VendoredRequestMetadataCallbackAdapter(
+
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void onSuccess(Map<String, List<String>> metadata) {
+ callback.onSuccess(metadata);
+ }
+
+ @Override
+ public void onFailure(Throwable exception) {
+ callback.onFailure(exception);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
index 3c7798126e5..5fb22476ab3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
similarity index 96%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
index a1f80598d89..007717d03b5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
similarity index 97%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
index e0878b7b0b9..e3f12687638 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
import java.util.function.Function;
import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
new file mode 100644
index 00000000000..48cf8ff3f76
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+
+/** Utility class used to create different RPC Channels. */
+public final class WindmillChannelFactory {
+ public static final String LOCALHOST = "localhost";
+ private static final int DEFAULT_GRPC_PORT = 443;
+
+ private WindmillChannelFactory() {}
+
+ public static ManagedChannel inProcessChannel(String channelName) {
+ return
InProcessChannelBuilder.forName(channelName).directExecutor().build();
+ }
+
+ public static Channel localhostChannel(int port) {
+ return NettyChannelBuilder.forAddress(LOCALHOST, port)
+ .maxInboundMessageSize(Integer.MAX_VALUE)
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ }
+
+ static Channel remoteChannel(
+ WindmillServiceAddress windmillServiceAddress, int
windmillServiceRpcChannelTimeoutSec) {
+ switch (windmillServiceAddress.getKind()) {
+ case IPV6:
+ return remoteChannel(windmillServiceAddress.ipv6(),
windmillServiceRpcChannelTimeoutSec);
+ case GCP_SERVICE_ADDRESS:
+ return remoteChannel(
+ windmillServiceAddress.gcpServiceAddress(),
windmillServiceRpcChannelTimeoutSec);
+ // switch is exhaustive will never happen.
+ default:
+ throw new UnsupportedOperationException(
+ "Only IPV6 and GCP_SERVICE_ADDRESS are supported
WindmillServiceAddresses.");
+ }
+ }
+
+ public static Channel remoteChannel(
+ HostAndPort endpoint, int windmillServiceRpcChannelTimeoutSec) {
+ try {
+ return createRemoteChannel(
+ NettyChannelBuilder.forAddress(endpoint.getHost(),
endpoint.getPort()),
+ windmillServiceRpcChannelTimeoutSec);
+ } catch (SSLException sslException) {
+ throw new WindmillChannelCreationException(endpoint, sslException);
+ }
+ }
+
+ public static Channel remoteChannel(
+ Inet6Address directEndpoint, int port, int
windmillServiceRpcChannelTimeoutSec) {
+ try {
+ return createRemoteChannel(
+ NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint,
port)),
+ windmillServiceRpcChannelTimeoutSec);
+ } catch (SSLException sslException) {
+ throw new WindmillChannelCreationException(directEndpoint.toString(),
sslException);
+ }
+ }
+
+ public static Channel remoteChannel(
+ Inet6Address directEndpoint, int windmillServiceRpcChannelTimeoutSec) {
+ try {
+ return createRemoteChannel(
+ NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint,
DEFAULT_GRPC_PORT)),
+ windmillServiceRpcChannelTimeoutSec);
+ } catch (SSLException sslException) {
+ throw new WindmillChannelCreationException(directEndpoint.toString(),
sslException);
+ }
+ }
+
+ @SuppressWarnings("nullness")
+ private static Channel createRemoteChannel(
+ NettyChannelBuilder channelBuilder, int
windmillServiceRpcChannelTimeoutSec)
+ throws SSLException {
+ if (windmillServiceRpcChannelTimeoutSec > 0) {
+ channelBuilder
+ .keepAliveTime(windmillServiceRpcChannelTimeoutSec, TimeUnit.SECONDS)
+ .keepAliveTimeout(windmillServiceRpcChannelTimeoutSec,
TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true);
+ }
+
+ return channelBuilder
+ .flowControlWindow(10 * 1024 * 1024)
+ .maxInboundMessageSize(Integer.MAX_VALUE)
+ .maxInboundMetadataSize(1024 * 1024)
+ .negotiationType(NegotiationType.TLS)
+ // Set ciphers(null) to not use GCM, which is disabled for Dataflow
+ // due to it being horribly slow.
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+ }
+
+ public static class WindmillChannelCreationException extends
IllegalStateException {
+ private WindmillChannelCreationException(HostAndPort endpoint,
SSLException sourceException) {
+ super(
+ String.format(
+ "Exception thrown when trying to create channel to
endpoint={host:%s; port:%d}",
+ endpoint.getHost(), endpoint.getPort()),
+ sourceException);
+ }
+
+ WindmillChannelCreationException(String directEndpoint, Throwable
sourceException) {
+ super(
+ String.format(
+ "Exception thrown when trying to create channel to
endpoint={%s}", directEndpoint),
+ sourceException);
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
new file mode 100644
index 00000000000..0c7719b0bc1
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.auth.Credentials;
+import com.google.auto.value.AutoOneOf;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials;
+
+/**
+ * Used to create stubs to talk to Streaming Engine. Stubs are either
in-process for testing, or
+ * remote.
+ */
+@AutoOneOf(WindmillStubFactory.Kind.class)
+public abstract class WindmillStubFactory {
+
+ public static WindmillStubFactory inProcessStubFactory(
+ String testName, Function<String, ManagedChannel> channelFactory) {
+ return AutoOneOf_WindmillStubFactory.inProcess(
+ () ->
CloudWindmillServiceV1Alpha1Grpc.newStub(channelFactory.apply(testName)));
+ }
+
+ public static WindmillStubFactory inProcessStubFactory(String testName) {
+ return AutoOneOf_WindmillStubFactory.inProcess(
+ () ->
+ CloudWindmillServiceV1Alpha1Grpc.newStub(
+ WindmillChannelFactory.inProcessChannel(testName)));
+ }
+
+ public static WindmillStubFactory remoteStubFactory(
+ int rpcChannelTimeoutSec, Credentials gcpCredentials) {
+ return AutoOneOf_WindmillStubFactory.remote(
+ directEndpoint ->
+ CloudWindmillServiceV1Alpha1Grpc.newStub(
+ remoteChannel(directEndpoint, rpcChannelTimeoutSec))
+ .withCallCredentials(
+ MoreCallCredentials.from(new
VendoredCredentialsAdapter(gcpCredentials))));
+ }
+
+ public abstract Kind getKind();
+
+ public abstract Supplier<CloudWindmillServiceV1Alpha1Stub> inProcess();
+
+ public abstract Function<WindmillServiceAddress,
CloudWindmillServiceV1Alpha1Stub> remote();
+
+ public enum Kind {
+ IN_PROCESS,
+ REMOTE
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
new file mode 100644
index 00000000000..6b8dd272037
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class StreamingEngineThrottleTimers {
+
+ public static StreamingEngineThrottleTimers create() {
+ return new AutoValue_StreamingEngineThrottleTimers(
+ new ThrottleTimer(), new ThrottleTimer(), new ThrottleTimer());
+ }
+
+ public long getAndResetThrottleTime() {
+ return getWorkThrottleTimer().getAndResetThrottleTime()
+ + getDataThrottleTimer().getAndResetThrottleTime()
+ + commitWorkThrottleTimer().getAndResetThrottleTime();
+ }
+
+ public abstract ThrottleTimer getWorkThrottleTimer();
+
+ public abstract ThrottleTimer getDataThrottleTimer();
+
+ public abstract ThrottleTimer commitWorkThrottleTimer();
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
similarity index 94%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
index 237339aff39..f660112721b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
import org.joda.time.Instant;
@@ -25,7 +25,7 @@ import org.joda.time.Instant;
* CommitWork are both blocked for x, totalTime will be 2x. However, if 2
GetWork streams are both
* blocked for x totalTime will be x. All methods are thread safe.
*/
-class ThrottleTimer {
+public final class ThrottleTimer {
// This is -1 if not currently being throttled or the time in
// milliseconds when throttling for this type started.
private long startTime = -1;
@@ -36,7 +36,7 @@ class ThrottleTimer {
/**
* Starts the timer if it has not been started and does nothing if it has
already been started.
*/
- synchronized void start() {
+ public synchronized void start() {
if (!throttled()) { // This timer is not started yet so start it now.
startTime = Instant.now().getMillis();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
deleted file mode 100644
index 19cb90297df..00000000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java
+++ /dev/null
@@ -1,607 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Sleeper;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials;
-import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** gRPC client for communicating with Streaming Engine. */
-// Very likely real potential for bugs -
https://github.com/apache/beam/issues/19273
-// Very likely real potential for bugs -
https://github.com/apache/beam/issues/19271
-@SuppressFBWarnings({"JLM_JSR166_UTILCONCURRENT_MONITORENTER",
"IS2_INCONSISTENT_SYNC"})
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public final class GrpcWindmillServer extends WindmillServerStub {
- private static final Logger LOG =
LoggerFactory.getLogger(GrpcWindmillServer.class);
-
- // If a connection cannot be established, gRPC will fail fast so this
deadline can be relatively
- // high.
- private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
- private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
- private static final String LOCALHOST = "localhost";
- private static final Duration MIN_BACKOFF = Duration.millis(1);
- private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
- private static final AtomicLong nextId = new AtomicLong(0);
- private static final int NO_HEALTH_CHECK = -1;
-
- private final StreamingDataflowWorkerOptions options;
- private final int streamingRpcBatchLimit;
- private final
List<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub>
stubList;
- private final ThrottleTimer getWorkThrottleTimer;
- private final ThrottleTimer getDataThrottleTimer;
- private final ThrottleTimer commitWorkThrottleTimer;
- private final Random rand;
- private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
- private ImmutableSet<HostAndPort> endpoints;
- private int logEveryNStreamFailures;
- private Duration maxBackoff = MAX_BACKOFF;
- private WindmillApplianceGrpc.WindmillApplianceBlockingStub
syncApplianceStub = null;
-
- private GrpcWindmillServer(StreamingDataflowWorkerOptions options) {
- this.options = options;
- this.streamingRpcBatchLimit =
options.getWindmillServiceStreamingRpcBatchLimit();
- this.stubList = new ArrayList<>();
- this.logEveryNStreamFailures =
options.getWindmillServiceStreamingLogEveryNStreamFailures();
- this.endpoints = ImmutableSet.of();
- this.getWorkThrottleTimer = new ThrottleTimer();
- this.getDataThrottleTimer = new ThrottleTimer();
- this.commitWorkThrottleTimer = new ThrottleTimer();
- this.rand = new Random();
- this.streamRegistry = Collections.newSetFromMap(new ConcurrentHashMap<>());
- }
-
- private static StreamingDataflowWorkerOptions testOptions(boolean
enableStreamingEngine) {
- StreamingDataflowWorkerOptions options =
-
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
- options.setProject("project");
- options.setJobId("job");
- options.setWorkerId("worker");
- List<String> experiments =
- options.getExperiments() == null ? new ArrayList<>() :
options.getExperiments();
- if (enableStreamingEngine) {
- experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
- }
- options.setExperiments(experiments);
-
- options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE);
- options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK);
-
options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES);
-
- return options;
- }
-
- /** Create new instance of {@link GrpcWindmillServer}. */
- public static GrpcWindmillServer create(StreamingDataflowWorkerOptions
workerOptions)
- throws IOException {
- GrpcWindmillServer grpcWindmillServer = new
GrpcWindmillServer(workerOptions);
- if (workerOptions.getWindmillServiceEndpoint() != null) {
- grpcWindmillServer.configureWindmillServiceEndpoints();
- } else if (!workerOptions.isEnableStreamingEngine()
- && workerOptions.getLocalWindmillHostport() != null) {
- grpcWindmillServer.configureLocalHost();
- }
-
- if (workerOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs() > 0)
{
- grpcWindmillServer.scheduleHealthCheckTimer(
- workerOptions, () -> grpcWindmillServer.streamRegistry);
- }
-
- return grpcWindmillServer;
- }
-
- @VisibleForTesting
- static GrpcWindmillServer newTestInstance(String name) {
- GrpcWindmillServer testServer =
- new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ true));
-
testServer.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel(name)));
- return testServer;
- }
-
- @VisibleForTesting
- static GrpcWindmillServer newApplianceTestInstance(Channel channel) {
- GrpcWindmillServer testServer =
- new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */
false));
- testServer.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
- return testServer;
- }
-
- private static WindmillApplianceGrpc.WindmillApplianceBlockingStub
- createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) {
- return WindmillApplianceGrpc.newBlockingStub(channel)
-
.withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline());
- }
-
- private static Channel inProcessChannel(String name) {
- return InProcessChannelBuilder.forName(name).directExecutor().build();
- }
-
- private static Channel localhostChannel(int port) {
- return NettyChannelBuilder.forAddress(LOCALHOST, port)
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .negotiationType(NegotiationType.PLAINTEXT)
- .build();
- }
-
- private static UnsupportedOperationException
unsupportedUnaryRequestInStreamingEngineException(
- String rpcName) {
- return new UnsupportedOperationException(
- String.format("Unary %s calls are not supported in Streaming Engine.",
rpcName));
- }
-
- private void scheduleHealthCheckTimer(
- StreamingDataflowWorkerOptions options,
Supplier<Set<AbstractWindmillStream<?, ?>>> streams) {
- new Timer("WindmillHealthCheckTimer")
- .schedule(
- new HealthCheckTimerTask(options, streams),
- 0,
- options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
- }
-
- private void configureWindmillServiceEndpoints() throws IOException {
- Set<HostAndPort> endpoints = new HashSet<>();
- for (String endpoint :
Splitter.on(',').split(options.getWindmillServiceEndpoint())) {
- endpoints.add(
-
HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort()));
- }
- initializeWindmillService(endpoints);
- }
-
- private void configureLocalHost() {
- int portStart = options.getLocalWindmillHostport().lastIndexOf(':');
- String endpoint = options.getLocalWindmillHostport().substring(0,
portStart);
- assert ("grpc:localhost".equals(endpoint));
- int port =
Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1));
- this.endpoints = ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port));
- initializeLocalHost(port);
- }
-
- @Override
- public synchronized void setWindmillServiceEndpoints(Set<HostAndPort>
endpoints)
- throws IOException {
- Preconditions.checkNotNull(endpoints);
- if (endpoints.equals(this.endpoints)) {
- // The endpoints are equal don't recreate the stubs.
- return;
- }
- LOG.info("Creating a new windmill stub, endpoints: {}", endpoints);
- if (this.endpoints != null) {
- LOG.info("Previous windmill stub endpoints: {}", this.endpoints);
- }
- initializeWindmillService(endpoints);
- }
-
- @Override
- public synchronized boolean isReady() {
- return !stubList.isEmpty();
- }
-
- private synchronized void initializeLocalHost(int port) {
- this.logEveryNStreamFailures = 1;
- this.maxBackoff = Duration.millis(500);
- Channel channel = localhostChannel(port);
- if (options.isEnableStreamingEngine()) {
- this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
- } else {
- this.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
- }
- }
-
- private synchronized void initializeWindmillService(Set<HostAndPort>
endpoints)
- throws IOException {
- LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}",
endpoints);
- this.stubList.clear();
- this.endpoints = ImmutableSet.copyOf(endpoints);
- for (HostAndPort endpoint : this.endpoints) {
- if (LOCALHOST.equals(endpoint.getHost())) {
- initializeLocalHost(endpoint.getPort());
- } else {
- this.stubList.add(
- CloudWindmillServiceV1Alpha1Grpc.newStub(remoteChannel(endpoint))
- .withCallCredentials(
- MoreCallCredentials.from(
- new
VendoredCredentialsAdapter(options.getGcpCredential()))));
- }
- }
- }
-
- private Channel remoteChannel(HostAndPort endpoint) throws IOException {
- NettyChannelBuilder builder =
- NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort());
- int timeoutSec = options.getWindmillServiceRpcChannelAliveTimeoutSec();
- if (timeoutSec > 0) {
- builder
- .keepAliveTime(timeoutSec, TimeUnit.SECONDS)
- .keepAliveTimeout(timeoutSec, TimeUnit.SECONDS)
- .keepAliveWithoutCalls(true);
- }
- return builder
- .flowControlWindow(10 * 1024 * 1024)
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .maxInboundMetadataSize(1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- // Set ciphers(null) to not use GCM, which is disabled for Dataflow
- // due to it being horribly slow.
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .build();
- }
-
- /**
- * Stubs returned from this method do not (and should not) have {@link
- * org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) set since they
represent an absolute
- * point in time. {@link
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) should not be
- * treated as a timeout which represents a relative point in time.
- *
- * @see <a href=https://grpc.io/blog/deadlines/>Official gRPC deadline
documentation for more
- * details.</a>
- */
- private synchronized
CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub() {
- if (stubList.isEmpty()) {
- throw new RuntimeException("windmillServiceEndpoint has not been set");
- }
-
- return stubList.size() == 1 ? stubList.get(0) :
stubList.get(rand.nextInt(stubList.size()));
- }
-
- @Override
- public void appendSummaryHtml(PrintWriter writer) {
- writer.write("Active Streams:<br>");
- for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
- stream.appendSummaryHtml(writer);
- writer.write("<br>");
- }
- }
-
- // Configure backoff to retry calls forever, with a maximum sane retry
interval.
- private BackOff grpcBackoff() {
- return FluentBackoff.DEFAULT
- .withInitialBackoff(MIN_BACKOFF)
- .withMaxBackoff(maxBackoff)
- .backoff();
- }
-
- private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
- BackOff backoff = grpcBackoff();
- int rpcErrors = 0;
- while (true) {
- try {
- return function.get();
- } catch (StatusRuntimeException e) {
- try {
- if (++rpcErrors % 20 == 0) {
- LOG.warn(
- "Many exceptions calling gRPC. Last exception: {} with status
{}",
- e,
- e.getStatus());
- }
- if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
- throw new RpcException(e);
- }
- } catch (IOException | InterruptedException i) {
- if (i instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- RpcException rpcException = new RpcException(e);
- rpcException.addSuppressed(i);
- throw rpcException;
- }
- }
- }
- }
-
- @Override
- public GetWorkResponse getWork(GetWorkRequest request) {
- if (syncApplianceStub != null) {
- return callWithBackoff(() -> syncApplianceStub.getWork(request));
- }
-
- throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
- }
-
- @Override
- public GetDataResponse getData(GetDataRequest request) {
- if (syncApplianceStub != null) {
- return callWithBackoff(() -> syncApplianceStub.getData(request));
- }
-
- throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
- }
-
- @Override
- public CommitWorkResponse commitWork(CommitWorkRequest request) {
- if (syncApplianceStub != null) {
- return callWithBackoff(() -> syncApplianceStub.commitWork(request));
- }
- throw new
RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
- }
-
- private StreamObserverFactory newStreamObserverFactory() {
- return StreamObserverFactory.direct(
- DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2,
options.getWindmillMessagesBetweenIsReadyChecks());
- }
-
- @Override
- public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver
receiver) {
- GetWorkRequest getWorkRequest =
- GetWorkRequest.newBuilder(request)
- .setJobId(options.getJobId())
- .setProjectId(options.getProject())
- .setWorkerId(options.getWorkerId())
- .build();
-
- return GrpcGetWorkStream.create(
- responseObserver ->
- stub()
- // Deadlines are absolute points in time, so generate a new
one everytime this
- // function is called.
- .withDeadlineAfter(
-
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
- .getWorkStream(responseObserver),
- getWorkRequest,
- grpcBackoff(),
- newStreamObserverFactory(),
- streamRegistry,
- logEveryNStreamFailures,
- getWorkThrottleTimer,
- receiver);
- }
-
- @Override
- public GetDataStream getDataStream() {
- return GrpcGetDataStream.create(
- responseObserver ->
- stub()
- // Deadlines are absolute points in time, so generate a new
one everytime this
- // function is called.
- .withDeadlineAfter(
-
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
- .getDataStream(responseObserver),
- grpcBackoff(),
- newStreamObserverFactory(),
- streamRegistry,
- logEveryNStreamFailures,
- getDataThrottleTimer,
- makeHeader(),
- nextId,
- streamingRpcBatchLimit);
- }
-
- @Override
- public CommitWorkStream commitWorkStream() {
- return GrpcCommitWorkStream.create(
- responseObserver ->
- stub()
- // Deadlines are absolute points in time, so generate a new
one everytime this
- // function is called.
- .withDeadlineAfter(
-
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
- .commitWorkStream(responseObserver),
- grpcBackoff(),
- newStreamObserverFactory(),
- streamRegistry,
- logEveryNStreamFailures,
- commitWorkThrottleTimer,
- makeHeader(),
- nextId,
- streamingRpcBatchLimit);
- }
-
- @Override
- public GetConfigResponse getConfig(GetConfigRequest request) {
- if (syncApplianceStub != null) {
- return callWithBackoff(() -> syncApplianceStub.getConfig(request));
- }
-
- throw new RpcException(
- new UnsupportedOperationException("GetConfig not supported in
Streaming Engine."));
- }
-
- @Override
- public ReportStatsResponse reportStats(ReportStatsRequest request) {
- if (syncApplianceStub != null) {
- return callWithBackoff(() -> syncApplianceStub.reportStats(request));
- }
-
- throw new RpcException(
- new UnsupportedOperationException("ReportStats not supported in
Streaming Engine."));
- }
-
- @Override
- public long getAndResetThrottleTime() {
- return getWorkThrottleTimer.getAndResetThrottleTime()
- + getDataThrottleTimer.getAndResetThrottleTime()
- + commitWorkThrottleTimer.getAndResetThrottleTime();
- }
-
- private JobHeader makeHeader() {
- return JobHeader.newBuilder()
- .setJobId(options.getJobId())
- .setProjectId(options.getProject())
- .setWorkerId(options.getWorkerId())
- .build();
- }
-
- /**
- * Create a wrapper around credentials callback that delegates to the
underlying vendored {@link
- * com.google.auth.RequestMetadataCallback}. Note that this class should
override every method
- * that is not final and not static and call the delegate directly.
- *
- * <p>TODO: Replace this with an auto generated proxy which calls the
underlying implementation
- * delegate to reduce maintenance burden.
- */
- private static class VendoredRequestMetadataCallbackAdapter
- implements com.google.auth.RequestMetadataCallback {
-
- private final
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
- callback;
-
- private VendoredRequestMetadataCallbackAdapter(
-
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
callback) {
- this.callback = callback;
- }
-
- @Override
- public void onSuccess(Map<String, List<String>> metadata) {
- callback.onSuccess(metadata);
- }
-
- @Override
- public void onFailure(Throwable exception) {
- callback.onFailure(exception);
- }
- }
-
- /**
- * Create a wrapper around credentials that delegates to the underlying
{@link
- * com.google.auth.Credentials}. Note that this class should override every
method that is not
- * final and not static and call the delegate directly.
- *
- * <p>TODO: Replace this with an auto generated proxy which calls the
underlying implementation
- * delegate to reduce maintenance burden.
- */
- private static class VendoredCredentialsAdapter
- extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials {
-
- private final com.google.auth.Credentials credentials;
-
- private VendoredCredentialsAdapter(com.google.auth.Credentials
credentials) {
- this.credentials = credentials;
- }
-
- @Override
- public String getAuthenticationType() {
- return credentials.getAuthenticationType();
- }
-
- @Override
- public Map<String, List<String>> getRequestMetadata() throws IOException {
- return credentials.getRequestMetadata();
- }
-
- @Override
- public void getRequestMetadata(
- final URI uri,
- Executor executor,
- final
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback
- callback) {
- credentials.getRequestMetadata(
- uri, executor, new VendoredRequestMetadataCallbackAdapter(callback));
- }
-
- @Override
- public Map<String, List<String>> getRequestMetadata(URI uri) throws
IOException {
- return credentials.getRequestMetadata(uri);
- }
-
- @Override
- public boolean hasRequestMetadata() {
- return credentials.hasRequestMetadata();
- }
-
- @Override
- public boolean hasRequestMetadataOnly() {
- return credentials.hasRequestMetadataOnly();
- }
-
- @Override
- public void refresh() throws IOException {
- credentials.refresh();
- }
- }
-
- private static class HealthCheckTimerTask extends TimerTask {
- private final StreamingDataflowWorkerOptions options;
- private final Supplier<Set<AbstractWindmillStream<?, ?>>> streams;
-
- public HealthCheckTimerTask(
- StreamingDataflowWorkerOptions options,
- Supplier<Set<AbstractWindmillStream<?, ?>>> streams) {
- this.options = options;
- this.streams = streams;
- }
-
- @Override
- public void run() {
- Instant reportThreshold =
- Instant.now()
-
.minus(Duration.millis(options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()));
- for (AbstractWindmillStream<?, ?> stream : streams.get()) {
- stream.maybeSendHealthCheck(reportThreshold);
- }
- }
- }
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
new file mode 100644
index 00000000000..307dfdfa17b
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work;
+
+import java.util.Collection;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/** Functional interface for receiving WorkItems. */
+@FunctionalInterface
+public interface WorkItemReceiver {
+ void receiveWork(
+ String computation,
+ @Nullable Instant inputDataWatermark,
+ @Nullable Instant synchronizedProcessingTime,
+ Windmill.WorkItem workItem,
+ Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
new file mode 100644
index 00000000000..0038e3e9cc6
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+ public static GetWorkBudget.Builder builder() {
+ return new AutoValue_GetWorkBudget.Builder();
+ }
+
+ /** {@link GetWorkBudget} of 0. */
+ public static GetWorkBudget noBudget() {
+ return builder().setItems(0).setBytes(0).build();
+ }
+
+ public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+ return builder()
+ .setItems(getWorkRequest.getMaxItems())
+ .setBytes(getWorkRequest.getMaxBytes())
+ .build();
+ }
+
+ /**
+ * Adds the given bytes and items or the current budget, returning a new
{@link GetWorkBudget}.
+ * Does not drop below 0.
+ */
+ public GetWorkBudget add(long items, long bytes) {
+ Preconditions.checkArgument(items >= 0 && bytes >= 0);
+ return GetWorkBudget.builder().setBytes(bytes() + bytes).setItems(items()
+ items).build();
+ }
+
+ public GetWorkBudget add(GetWorkBudget other) {
+ return add(other.items(), other.bytes());
+ }
+
+ /**
+ * Subtracts the given bytes and items or the current budget, returning a
new {@link
+ * GetWorkBudget}. Does not drop below 0.
+ */
+ public GetWorkBudget subtract(long items, long bytes) {
+ Preconditions.checkArgument(items >= 0 && bytes >= 0);
+ return GetWorkBudget.builder().setBytes(bytes() - bytes).setItems(items()
- items).build();
+ }
+
+ public GetWorkBudget subtract(GetWorkBudget other) {
+ return subtract(other.items(), other.bytes());
+ }
+
+ /** Budget of bytes for GetWork. Does not drop below 0. */
+ public abstract long bytes();
+
+ /** Budget of items for GetWork. Does not drop below 0. */
+ public abstract long items();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setBytes(long bytes);
+
+ public abstract Builder setItems(long budget);
+
+ abstract long items();
+
+ abstract long bytes();
+
+ abstract GetWorkBudget autoBuild();
+
+ public final GetWorkBudget build() {
+ setItems(Math.max(0, items()));
+ setBytes(Math.max(0, bytes()));
+ return autoBuild();
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 4700217dc8a..092f5e59a13 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -53,9 +53,11 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribut
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Duration;
@@ -198,8 +200,7 @@ class FakeWindmillServer extends WindmillServerStub {
}
@Override
- public GetWorkStream getWorkStream(
- Windmill.GetWorkRequest request, GetWorkStream.WorkItemReceiver
receiver) {
+ public GetWorkStream getWorkStream(Windmill.GetWorkRequest request,
WorkItemReceiver receiver) {
LOG.debug("getWorkStream: {}", request.toString());
Instant startTime = Instant.now();
final CountDownLatch done = new CountDownLatch(1);
@@ -209,6 +210,19 @@ class FakeWindmillServer extends WindmillServerStub {
done.countDown();
}
+ @Override
+ public void adjustBudget(long itemsDelta, long bytesDelta) {
+ // no-op.
+ }
+
+ @Override
+ public GetWorkBudget remainingBudget() {
+ return GetWorkBudget.builder()
+ .setItems(request.getMaxItems())
+ .setBytes(request.getMaxBytes())
+ .build();
+ }
+
@Override
public boolean awaitTermination(int time, TimeUnit unit) throws
InterruptedException {
while (done.getCount() > 0) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
similarity index 99%
rename from
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
rename to
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
index 9924bb7d2b2..264540531bf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
similarity index 96%
rename from
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
rename to
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
index 45ed3381a8b..e3b07bf7aa4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import static com.google.common.truth.Truth.assertThat;
-import static
org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
@@ -33,13 +33,14 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
-import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
rename to
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 53afc6990e4..d9f4b72716c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -68,9 +68,9 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Value;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
@@ -99,10 +99,7 @@ import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Unit tests for {@link
- *
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer}.
- */
+/** Unit tests for {@link GrpcWindmillServer}. */
@RunWith(JUnit4.class)
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
@@ -114,7 +111,7 @@ public class GrpcWindmillServerTest {
private final MutableHandlerRegistry serviceRegistry = new
MutableHandlerRegistry();
@Rule public ErrorCollector errorCollector = new ErrorCollector();
private Server server;
- private
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer
client;
+ private GrpcWindmillServer client;
private int remainingErrors = 20;
@Before
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
new file mode 100644
index 00000000000..76d50839785
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GetWorkBudgetTest {
+
+ @Test
+ public void testCreateWithNoBudget() {
+ GetWorkBudget getWorkBudget = GetWorkBudget.noBudget();
+ assertEquals(0, getWorkBudget.items());
+ assertEquals(0, getWorkBudget.bytes());
+ }
+
+ @Test
+ public void testBuild_itemsAndBytesNeverBelowZero() {
+ GetWorkBudget getWorkBudget =
GetWorkBudget.builder().setItems(-10).setBytes(-10).build();
+ assertEquals(0, getWorkBudget.items());
+ assertEquals(0, getWorkBudget.bytes());
+ }
+
+ @Test
+ public void testAdd_doesNotAllowNegativeParameters() {
+ GetWorkBudget getWorkBudget =
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+ assertThrows(IllegalArgumentException.class, () -> getWorkBudget.add(-1,
-1));
+ }
+
+ @Test
+ public void testSubtract_itemsAndBytesNeverBelowZero() {
+ GetWorkBudget getWorkBudget =
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+ GetWorkBudget subtracted = getWorkBudget.subtract(10, 10);
+ assertEquals(0, subtracted.items());
+ assertEquals(0, subtracted.bytes());
+ }
+
+ @Test
+ public void testSubtractGetWorkBudget_itemsAndBytesNeverBelowZero() {
+ GetWorkBudget getWorkBudget =
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+ GetWorkBudget subtracted =
+
getWorkBudget.subtract(GetWorkBudget.builder().setItems(10).setBytes(10).build());
+ assertEquals(0, subtracted.items());
+ assertEquals(0, subtracted.bytes());
+ }
+
+ @Test
+ public void testSubtract_doesNotAllowNegativeParameters() {
+ GetWorkBudget getWorkBudget =
GetWorkBudget.builder().setItems(1).setBytes(1).build();
+ assertThrows(IllegalArgumentException.class, () ->
getWorkBudget.subtract(-1, -1));
+ }
+}