scwhittle commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1477937118
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -101,36 +103,21 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
// newer ComputationHeartbeatRequests.
private final boolean sendKeyedGetDataRequests;
private Consumer<List<ComputationHeartbeatResponse>>
processHeartbeatResponses;
+ private @Nullable GrpcWindmillStreamFactory windmillStreamFactory;
Review Comment:
if we can change constructor to require the streamfactory this nullable and
preconditons below can be removed, see other comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
private final WindmillStubFactory windmillStubFactory;
- @GuardedBy("this")
- private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
- @GuardedBy("this")
- private final Set<HostAndPort> dispatcherEndpoints;
+ /**
+ * Current dispatcher endpoints and stubs used to communicate with Windmill
Dispatcher.
+ *
+ * @implNote Reads are lock free, writes are synchronized.
+ */
+ private final AtomicReference<DispatcherStubs> dispatcherStubs;
@GuardedBy("this")
private final Random rand;
private GrpcDispatcherClient(
WindmillStubFactory windmillStubFactory,
- List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
- Set<HostAndPort> dispatcherEndpoints,
+ DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactory = windmillStubFactory;
- this.dispatcherStubs = dispatcherStubs;
- this.dispatcherEndpoints = dispatcherEndpoints;
this.rand = rand;
+ this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
}
static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
- return new GrpcDispatcherClient(
- windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+ return new GrpcDispatcherClient(windmillStubFactory,
DispatcherStubs.empty(), new Random());
}
@VisibleForTesting
static GrpcDispatcherClient forTesting(
WindmillStubFactory windmillGrpcStubFactory,
- List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+ List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+ List<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs,
Set<HostAndPort> dispatcherEndpoints) {
- Preconditions.checkArgument(dispatcherEndpoints.size() ==
dispatcherStubs.size());
+ Preconditions.checkArgument(
+ dispatcherEndpoints.size() == windmillServiceStubs.size()
+ && windmillServiceStubs.size() ==
windmillMetadataServiceStubs.size());
return new GrpcDispatcherClient(
- windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new
Random());
+ windmillGrpcStubFactory,
+ DispatcherStubs.create(
+ dispatcherEndpoints, windmillServiceStubs,
windmillMetadataServiceStubs),
+ new Random());
+ }
+
+ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+ ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+ dispatcherStubs.get().windmillServiceStubs();
+ Preconditions.checkState(
+ !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been
set");
+
+ return (windmillServiceStubs.size() == 1
+ ? windmillServiceStubs.get(0)
+ : randomlySelectNextStub(windmillServiceStubs));
}
- synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+ CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+ ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs =
+ dispatcherStubs.get().windmillMetadataServiceStubs();
Preconditions.checkState(
- !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been
set");
+ !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has
not been set");
+
+ return (windmillMetadataServiceStubs.size() == 1
+ ? windmillMetadataServiceStubs.get(0)
+ : randomlySelectNextStub(windmillMetadataServiceStubs));
+ }
- return (dispatcherStubs.size() == 1
- ? dispatcherStubs.get(0)
- : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+ private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+ return stubs.get(rand.nextInt(stubs.size()));
}
- synchronized boolean isReady() {
- return !dispatcherStubs.isEmpty();
+ boolean isReady() {
Review Comment:
this seems like it could lead to racy usage
if (client.isReady()) {
client.xxx
}
but since the dispatcherStubs can change in-between, there may not be a stub
by the time the next call is made.
Can this be removed?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to
be used for testing.
+ */
+final class InProcessWindmillStubFactory implements WindmillStubFactory {
+ private final String testName;
+ private final Function<String, ManagedChannel> channelFactory;
+
+ InProcessWindmillStubFactory(String testName, Function<String,
ManagedChannel> channelFactory) {
Review Comment:
nit: coudl just take a Supplier<ManagedChannel>
If tests have a Function<String, ManagedChannel> they can always bind the
test name themselves.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactories.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.google.auth.Credentials;
+import java.util.function.Function;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+@Internal
+public final class WindmillStubFactories {
Review Comment:
should we get rid of this one? If the fake is moved to test folder this just
has single method. I think callers could use RemoteWindmillStubFactory
constructor directly
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -422,13 +422,20 @@ public void run() {
commitThread.setName("CommitThread");
this.publishCounters = publishCounters;
+ this.clientId = clientIdGenerator.nextLong();
this.windmillServer = options.getWindmillServerStub();
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
+ windmillServer.start(
Review Comment:
could we avoid the start method (which is more complex than a factory method
taking necessary requirements and initalizing) by moving the grpc server
initialization into this file?
It's kind of odd that it is within getWindmillServerStub default factory.
It appears that is done so that the test can inject a fake server. One idea
would be to remove the default factory from options and create a server here if
the option is null but use injected server otherwise.
Then you could just create the GrpcWindmillServer directly with either a
factory or constructor taking the JobHeader without modifying the
WindmillServerStub interface.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to
be used for testing.
Review Comment:
should this be moved to test folder and renamed FakeWindmillStubFactory? it
ignores the serviceAddress
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -84,14 +86,14 @@
})
@SuppressWarnings("nullness") //
TODO(https://github.com/apache/beam/issues/20497
public final class GrpcWindmillServer extends WindmillServerStub {
+ public static final Duration LOCALHOST_BACKOFF = Duration.millis(500);
+ public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
Review Comment:
nit: seems good to leave max/min together
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +70,75 @@ public static WindmillEndpoints.Builder builder() {
return new AutoValue_WindmillEndpoints.Builder();
}
+ private static Optional<WindmillServiceAddress> parseDirectEndpoint(
+ Windmill.WorkerMetadataResponse.Endpoint endpointProto, String
authenticatingService) {
+ Optional<WindmillServiceAddress> directEndpointIpV6Address =
+ tryParseDirectEndpointIntoIpV6Address(endpointProto)
+ .map(address ->
AuthenticatedGcpServiceAddress.create(authenticatingService, address))
+ .map(WindmillServiceAddress::create);
+
+ return directEndpointIpV6Address.isPresent()
+ ? directEndpointIpV6Address
+ : tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint())
+ .map(WindmillServiceAddress::create);
+ }
+
+ private static Optional<HostAndPort> tryParseEndpointIntoHostAndPort(String
directEndpoint) {
+ try {
+ return Optional.of(HostAndPort.fromString(directEndpoint));
+ } catch (IllegalArgumentException e) {
+ LOG.warn("{} cannot be parsed into a gcpServiceAddress", directEndpoint);
+ return Optional.empty();
+ }
+ }
+
+ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
+ Windmill.WorkerMetadataResponse.Endpoint endpointProto) {
+ if (!endpointProto.hasDirectEndpoint()) {
+ return Optional.empty();
+ }
+
+ InetAddress directEndpointAddress = null;
+ try {
+ directEndpointAddress =
Inet6Address.getByName(endpointProto.getDirectEndpoint());
+ } catch (UnknownHostException e) {
+ LOG.warn(
+ "Error occurred trying to parse direct_endpoint={} into IPv6
address. Exception={}",
+ endpointProto.getDirectEndpoint(),
+ e.toString());
Review Comment:
return here, otherwise directEndpointAddress may be null below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]