arunpandianp commented on code in PR #30233:
URL: https://github.com/apache/beam/pull/30233#discussion_r1479974199
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -59,28 +60,66 @@ public class MetricTrackingWindmillServerStub {
private final MemoryMonitor gcThrashingMonitor;
private final boolean useStreamingRequests;
+ private final WindmillStreamPool<GetDataStream> getDataStreamPool;
+
+ // May be the same instance as getDataStreamPool based upon options.
Review Comment:
```suggestion
// Maybe the same instance as getDataStreamPool based upon options.
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java:
##########
@@ -130,6 +130,24 @@ public interface StreamingDataflowWorkerOptions extends
DataflowWorkerHarnessOpt
void setWindmillMessagesBetweenIsReadyChecks(int value);
+ @Description("If true, a most a single active rpc will be used per channel.")
+ @Default.Boolean(false)
+ boolean getUseIsolatedChannels();
Review Comment:
thoughts on naming these getUseIsolatedWindmillChannels,
getUseSeparateWindmillHeartbeatStreams, getWindmillGetDataStreamCount to
differentiate from other channels(eg: BQ, GCS)?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * A {@link ManagedChannel} that creates a dynamic # of cached channels to the
same endpoint such
+ * that each active rpc has its own channel.
+ */
+@Internal
+class IsolationChannel extends ManagedChannel {
+ static final Logger LOG = Logger.getLogger(IsolationChannel.class.getName());
+
+ private final Supplier<ManagedChannel> channelFactory;
+
+ @GuardedBy("channelCache")
+ private final List<ManagedChannel> channelCache;
+
+ @GuardedBy("channelCache")
+ private final Set<ManagedChannel> usedChannels;
+
+ @GuardedBy("channelCache")
+ private boolean shutdownStarted = false;
+
+ private final String authority;
+
+ // Expected that supplier returns channels to the same endpoint with the
same authority.
+ public static IsolationChannel create(Supplier<ManagedChannel>
channelFactory) {
+ return new IsolationChannel(channelFactory);
+ }
+
+ @VisibleForTesting
+ IsolationChannel(Supplier<ManagedChannel> channelFactory) {
+ this.channelFactory = channelFactory;
+ this.channelCache = new ArrayList<>();
+ ManagedChannel channel = channelFactory.get();
+ this.authority = channel.authority();
+ this.channelCache.add(channel);
+ this.usedChannels = new HashSet<>();
+ }
+
+ @Override
+ public String authority() {
+ return authority;
+ }
+
+ /** Pick an unused channel from the set or create one and then create a
{@link ClientCall}. */
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+ MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions)
{
+ ManagedChannel channel = getChannel();
+ return new ReleasingClientCall<>(this, channel,
channel.newCall(methodDescriptor, callOptions));
+ }
+
+ private ManagedChannel getChannel() {
+ synchronized (channelCache) {
+ if (!channelCache.isEmpty()) {
+ ManagedChannel result = channelCache.remove(channelCache.size() - 1);
+ usedChannels.add(result);
+ return result;
+ }
+ }
+ ManagedChannel result = channelFactory.get();
+ synchronized (channelCache) {
Review Comment:
I think there is a race where `isTerminated` could return false after
returning true.
1. Thread 1 is at `result = channelFactory.get()`
2. Thread 2 calls isTerminated gets True
3. Thread 1 makes progress and does `result.shutdown()` and exits getChannel
4. Thread2 calls isTerminated again and gets False
Is this okay?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -59,28 +60,66 @@ public class MetricTrackingWindmillServerStub {
private final MemoryMonitor gcThrashingMonitor;
private final boolean useStreamingRequests;
+ private final WindmillStreamPool<GetDataStream> getDataStreamPool;
+
+ // May be the same instance as getDataStreamPool based upon options.
+ private final WindmillStreamPool<GetDataStream> heartbeatStreamPool;
+
@GuardedBy("this")
private final List<ReadBatch> pendingReadBatches;
@GuardedBy("this")
private int activeReadThreads = 0;
- private WindmillStreamPool<GetDataStream> streamPool;
+ @Internal
+ @AutoBuilder(ofClass = MetricTrackingWindmillServerStub.class)
+ public abstract static class Builder {
+
+ abstract Builder setServer(WindmillServerStub server);
+
+ abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);
+
+ abstract Builder setUseStreamingRequests(boolean useStreamingRequests);
- public MetricTrackingWindmillServerStub(
- WindmillServerStub server, MemoryMonitor gcThrashingMonitor, boolean
useStreamingRequests) {
+ abstract Builder setUseSeparateHeartbeatStreams(boolean
useSeparateHeartbeatStreams);
+
+ abstract Builder setNumGetDataStreams(int numGetDataStreams);
+
+ abstract MetricTrackingWindmillServerStub build();
+ }
+
+ public static Builder builder(WindmillServerStub server, MemoryMonitor
gcThrashingMonitor) {
+ return new AutoBuilder_MetricTrackingWindmillServerStub_Builder()
+ .setServer(server)
+ .setGcThrashingMonitor(gcThrashingMonitor)
+ .setUseStreamingRequests(false)
+ .setUseSeparateHeartbeatStreams(false)
+ .setNumGetDataStreams(1);
+ }
+
+ MetricTrackingWindmillServerStub(
+ WindmillServerStub server,
+ MemoryMonitor gcThrashingMonitor,
+ boolean useStreamingRequests,
+ boolean useSeparateHeartbeatStreams,
+ int numGetDataStreams) {
this.server = server;
this.gcThrashingMonitor = gcThrashingMonitor;
- // This is used as a queue but is expected to be less than 10 batches.
- this.pendingReadBatches = new ArrayList<>();
this.useStreamingRequests = useStreamingRequests;
- }
-
- public void start() {
if (useStreamingRequests) {
- streamPool =
- WindmillStreamPool.create(NUM_STREAMS, STREAM_TIMEOUT,
this.server::getDataStream);
+ getDataStreamPool =
+ WindmillStreamPool.create(numGetDataStreams, STREAM_TIMEOUT,
this.server::getDataStream);
Review Comment:
Can we cap numGetDataStreams to a min of 1?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * A {@link ManagedChannel} that creates a dynamic # of cached channels to the
same endpoint such
+ * that each active rpc has its own channel.
+ */
+@Internal
+class IsolationChannel extends ManagedChannel {
+ static final Logger LOG = Logger.getLogger(IsolationChannel.class.getName());
+
+ private final Supplier<ManagedChannel> channelFactory;
+
+ @GuardedBy("channelCache")
Review Comment:
Any specific reason to guard by channelCache? Can it be `@GuardedBy("this")`
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -245,11 +284,11 @@ public void refreshActiveWork(Map<String,
List<HeartbeatRequest>> heartbeats) {
if (useStreamingRequests) {
// With streaming requests, always send the request even when it is
empty, to ensure that
// we trigger health checks for the stream even when it is idle.
Review Comment:
> // With streaming requests, always send the request even when it is empty,
to ensure that
> // we trigger health checks for the stream even when it is idle.
I was going to say we don't do this anymore for the streams in geetDataPool,
but it looks like this never was true.
refreshActiveWork() internally doesn't send any requests when heartbeats is
empty. should we remove the comment?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * A {@link ManagedChannel} that creates a dynamic # of cached channels to the
same endpoint such
+ * that each active rpc has its own channel.
+ */
+@Internal
+class IsolationChannel extends ManagedChannel {
+ static final Logger LOG = Logger.getLogger(IsolationChannel.class.getName());
+
+ private final Supplier<ManagedChannel> channelFactory;
+
+ @GuardedBy("channelCache")
+ private final List<ManagedChannel> channelCache;
+
+ @GuardedBy("channelCache")
+ private final Set<ManagedChannel> usedChannels;
+
+ @GuardedBy("channelCache")
+ private boolean shutdownStarted = false;
+
+ private final String authority;
+
+ // Expected that supplier returns channels to the same endpoint with the
same authority.
+ public static IsolationChannel create(Supplier<ManagedChannel>
channelFactory) {
+ return new IsolationChannel(channelFactory);
+ }
+
+ @VisibleForTesting
+ IsolationChannel(Supplier<ManagedChannel> channelFactory) {
+ this.channelFactory = channelFactory;
+ this.channelCache = new ArrayList<>();
+ ManagedChannel channel = channelFactory.get();
+ this.authority = channel.authority();
+ this.channelCache.add(channel);
+ this.usedChannels = new HashSet<>();
+ }
+
+ @Override
+ public String authority() {
+ return authority;
+ }
+
+ /** Pick an unused channel from the set or create one and then create a
{@link ClientCall}. */
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+ MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions)
{
+ ManagedChannel channel = getChannel();
+ return new ReleasingClientCall<>(this, channel,
channel.newCall(methodDescriptor, callOptions));
+ }
+
+ private ManagedChannel getChannel() {
+ synchronized (channelCache) {
+ if (!channelCache.isEmpty()) {
+ ManagedChannel result = channelCache.remove(channelCache.size() - 1);
+ usedChannels.add(result);
+ return result;
+ }
+ }
+ ManagedChannel result = channelFactory.get();
+ synchronized (channelCache) {
+ usedChannels.add(result);
+ if (shutdownStarted) result.shutdown();
+ }
+ return result;
+ }
+
+ @Override
+ public ManagedChannel shutdown() {
+ synchronized (channelCache) {
+ shutdownStarted = true;
+ for (ManagedChannel channel : usedChannels) {
+ channel.shutdown();
+ }
+ for (ManagedChannel channel : channelCache) {
+ channel.shutdown();
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ synchronized (channelCache) {
+ if (!shutdownStarted) return false;
+ for (ManagedChannel channel : usedChannels) {
+ if (!channel.isShutdown()) return false;
+ }
+ for (ManagedChannel channel : channelCache) {
+ if (!channel.isShutdown()) return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ synchronized (channelCache) {
+ if (!shutdownStarted) return false;
+ for (ManagedChannel channel : usedChannels) {
+ if (!channel.isTerminated()) return false;
+ }
+ for (ManagedChannel channel : channelCache) {
+ if (!channel.isTerminated()) return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ManagedChannel shutdownNow() {
+ synchronized (channelCache) {
+ shutdownStarted = true;
+ for (ManagedChannel channel : usedChannels) {
+ channel.shutdownNow();
+ }
+ for (ManagedChannel channel : channelCache) {
+ channel.shutdownNow();
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ long endTimeNanos = System.nanoTime() + unit.toNanos(timeout);
+ if (isTerminated()) return true;
+ synchronized (channelCache) {
+ if (!shutdownStarted) return false;
+ for (ManagedChannel channel : usedChannels) {
+ long awaitTimeNanos = endTimeNanos - System.nanoTime();
+ if (awaitTimeNanos <= 0) {
+ break;
+ }
+ channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
+ }
+ for (ManagedChannel channel : channelCache) {
+ long awaitTimeNanos = endTimeNanos - System.nanoTime();
+ if (awaitTimeNanos <= 0) {
+ break;
+ }
+ channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+ return isTerminated();
+ }
+
+ void releaseChannelAfterCall(ManagedChannel managedChannel) {
+ synchronized (channelCache) {
+ Preconditions.checkState(
+ usedChannels.remove(managedChannel), "Channel released that was not
used");
+ channelCache.add(managedChannel);
+ }
+ }
+
+ /** ClientCall wrapper that adds channel back to the cache on completion. */
+ private static class ReleasingClientCall<ReqT, RespT>
+ extends SimpleForwardingClientCall<ReqT, RespT> {
+ private final IsolationChannel isolationChannel;
+
+ private final ManagedChannel channel;
+ private final AtomicBoolean wasReleased = new AtomicBoolean();
+
+ public ReleasingClientCall(
+ IsolationChannel isolationChannel,
+ ManagedChannel channel,
+ ClientCall<ReqT, RespT> delegate) {
+ super(delegate);
+ this.isolationChannel = isolationChannel;
+ this.channel = channel;
+ }
+
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ try {
+ super.start(
+ new SimpleForwardingClientCallListener<RespT>(responseListener) {
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ try {
+ super.onClose(status, trailers);
+ } finally {
+ releaseChannelOnce();
+ }
+ }
+ },
+ headers);
+ } catch (Exception e) {
+ releaseChannelOnce();
+ throw e;
+ }
+ }
+
+ private void releaseChannelOnce() {
+ if (!wasReleased.getAndSet(true)) {
+ isolationChannel.releaseChannelAfterCall(channel);
+ } else {
+ LOG.log(
Review Comment:
```suggestion
LOG.warning(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]