This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3693174c042 [Dataflow Streaming] Add Channelz staus page exporting
GRPC channelz data (#30211)
3693174c042 is described below
commit 3693174c0421d0ff049042ca283db633431892ef
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Feb 16 03:51:01 2024 -0800
[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data
(#30211)
Co-authored-by: Arun Pandian <[email protected]>
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
.../options/DataflowPipelineDebugOptions.java | 6 +
.../dataflow/worker/StreamingDataflowWorker.java | 10 +
.../worker/windmill/WindmillServerBase.java | 6 +
.../worker/windmill/WindmillServerStub.java | 6 +
.../windmill/client/grpc/ChannelzServlet.java | 292 +++++++++++++++++++++
.../windmill/client/grpc/GrpcDispatcherClient.java | 4 +
.../windmill/client/grpc/GrpcWindmillServer.java | 5 +
.../client/grpc/stubs/WindmillChannelFactory.java | 2 +
.../dataflow/worker/FakeWindmillServer.java | 22 +-
.../windmill/client/grpc/ChannelzServletTest.java | 104 ++++++++
11 files changed, 454 insertions(+), 5 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index fe80b826c56..2376a2c9bbc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_rabbitmq :
"org.testcontainers:rabbitmq:$testcontainers_version",
truth :
"com.google.truth:truth:1.1.5",
threetenbp :
"org.threeten:threetenbp:1.6.8",
- vendored_grpc_1_60_1 :
"org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
+ vendored_grpc_1_60_1 :
"org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
vendored_guava_32_1_2_jre :
"org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
vendored_calcite_1_28_0 :
"org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
woodstox_core_asl :
"org.codehaus.woodstox:woodstox-core-asl:4.4.1",
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 888b0d3f0b6..9b06fa9b7e2 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -311,6 +311,12 @@ public interface DataflowPipelineDebugOptions
void setWindmillGetDataStreamCount(int value);
+ @Description("If true, will only show windmill service channels on
/channelz")
+ @Default.Boolean(true)
+ boolean getChannelzShowOnlyWindmillServiceChannels();
+
+ void setChannelzShowOnlyWindmillServiceChannels(boolean value);
+
/**
* The amount of time before UnboundedReaders are considered idle and closed
during streaming
* execution.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 825c3fb78c7..2e0156bae77 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -114,6 +114,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApp
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
@@ -211,6 +212,7 @@ public class StreamingDataflowWorker {
private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION =
Duration.standardMinutes(5);
private static final Random clientIdGenerator = new Random();
+ private static final String CHANNELZ_PATH = "/channelz";
final WindmillStateCache stateCache;
// Maps from computation ids to per-computation state.
private final ConcurrentMap<String, ComputationState> computationMap;
@@ -735,6 +737,13 @@ public class StreamingDataflowWorker {
if (debugCaptureManager != null) {
debugCaptureManager.start();
}
+
+ if (windmillServiceEnabled) {
+ ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH,
options, windmillServer);
+ statusPages.addServlet(channelzServlet);
+ statusPages.addCapturePage(channelzServlet);
+ }
+
statusPages.addServlet(stateCache.statusServlet());
statusPages.addServlet(new SpecsServlet());
@@ -2081,6 +2090,7 @@ public class StreamingDataflowWorker {
}
private class MetricsDataProvider implements StatusDataProvider {
+
@Override
public void appendSummaryHtml(PrintWriter writer) {
writer.println(workUnitExecutor.summaryHtml());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
index 8caa79cd3f7..a1160b4f98d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java
@@ -23,6 +23,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Co
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
/**
@@ -53,6 +54,11 @@ public class WindmillServerBase extends WindmillServerStub {
// This class is used for windmill appliance and local runner tests.
}
+ @Override
+ public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+ return ImmutableSet.of();
+ }
+
@Override
public boolean isReady() {
return true;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index c327e68d7e9..34461ab471f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -25,6 +25,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Co
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
/** Stub for communicating with a Windmill server. */
@@ -40,6 +41,11 @@ public abstract class WindmillServerStub implements
StatusDataProvider {
*/
public abstract void setWindmillServiceEndpoints(Set<HostAndPort> endpoints)
throws IOException;
+ /*
+ * Returns the windmill service endpoints set by setWindmillServiceEndpoints
+ */
+ public abstract ImmutableSet<HostAndPort> getWindmillServiceEndpoints();
+
/** Returns true iff this WindmillServerStub is ready for making API calls.
*/
public abstract boolean isReady();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
new file mode 100644
index 00000000000..9ab02788603
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
+
+/** Respond to /path with the GRPC channelz data. */
+@Internal
+public class ChannelzServlet extends BaseStatusServlet implements
DebugCapture.Capturable {
+
+ private static final int MAX_TOP_CHANNELS_TO_RETURN = 500;
+
+ private final ChannelzService channelzService;
+ private final WindmillServerStub windmillServerStub;
+ private final boolean showOnlyWindmillServiceChannels;
+
+ public ChannelzServlet(
+ String path, StreamingDataflowWorkerOptions options, WindmillServerStub
windmillServerStub) {
+ super(path);
+ channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);
+ this.windmillServerStub = windmillServerStub;
+ showOnlyWindmillServiceChannels =
options.getChannelzShowOnlyWindmillServiceChannels();
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse
response)
+ throws IOException, ServletException {
+ response.setStatus(HttpServletResponse.SC_OK);
+ PrintWriter writer = response.getWriter();
+ captureData(writer);
+ }
+
+ @Override
+ public String pageName() {
+ return getPath();
+ }
+
+ @Override
+ public void captureData(PrintWriter writer) {
+ writer.println("<html>");
+ writer.println("<h1>Channelz</h1>");
+ appendTopChannels(writer);
+ writer.println("</html>");
+ }
+
+ // channelz proto says there won't be cycles in the ref graph.
+ // we track visited ids to be defensive and prevent any accidental cycles.
+ private static class VisitedSets {
+
+ Set<Long> channels = new HashSet<>();
+ Set<Long> subchannels = new HashSet<>();
+ }
+
+ private void appendTopChannels(PrintWriter writer) {
+ SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
+ // IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels
+ // in the worker, we might not return all the windmill channels. If we run
into
+ // such situations, this logic can be modified to loop till we see an empty
+ // GetTopChannelsResponse response with the end bit set.
+ channelzService.getTopChannels(
+ GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future));
+ GetTopChannelsResponse topChannelsResponse;
+ try {
+ topChannelsResponse = future.get();
+ } catch (Exception e) {
+ String msg = "Failed to get channelz: " + e.getMessage();
+ writer.println(msg);
+ return;
+ }
+
+ List<Channel> topChannels = topChannelsResponse.getChannelList();
+ if (showOnlyWindmillServiceChannels) {
+ topChannels = filterWindmillChannels(topChannels);
+ }
+ writer.println("<h2>Top Level Channels</h2>");
+ writer.println("<table border='1'>");
+ VisitedSets visitedSets = new VisitedSets();
+ for (Channel channel : topChannels) {
+ writer.println("<tr>");
+ writer.println("<td>");
+ writer.println("TopChannelId: " + channel.getRef().getChannelId());
+ writer.println("</td>");
+ writer.println("<td>");
+ appendChannel(channel, writer, visitedSets);
+ writer.println("</td>");
+ writer.println("</tr>");
+ }
+ writer.println("</table>");
+ }
+
+ private List<Channel> filterWindmillChannels(List<Channel> channels) {
+ ImmutableSet<HostAndPort> windmillServiceEndpoints =
+ windmillServerStub.getWindmillServiceEndpoints();
+ Set<String> windmillServiceHosts =
+
windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet());
+ List<Channel> windmillChannels = new ArrayList<>();
+ for (Channel channel : channels) {
+ for (String windmillServiceHost : windmillServiceHosts) {
+ if (channel.getData().getTarget().contains(windmillServiceHost)) {
+ windmillChannels.add(channel);
+ break;
+ }
+ }
+ }
+ return windmillChannels;
+ }
+
+ private void appendChannels(
+ List<ChannelRef> channelRefs, PrintWriter writer, VisitedSets
visitedSets) {
+ for (ChannelRef channelRef : channelRefs) {
+ writer.println("<tr>");
+ writer.println("<td>");
+ writer.println("Channel: " + channelRef.getChannelId());
+ writer.println("</td>");
+ writer.println("<td>");
+ appendChannel(channelRef, writer, visitedSets);
+ writer.println("</td>");
+ writer.println("</tr>");
+ }
+ }
+
+ private void appendChannel(ChannelRef channelRef, PrintWriter writer,
VisitedSets visitedSets) {
+ if (visitedSets.channels.contains(channelRef.getChannelId())) {
+ String msg = "Duplicate Channel Id: " + channelRef;
+ writer.println(msg);
+ return;
+ }
+ visitedSets.channels.add(channelRef.getChannelId());
+ SettableFuture<GetChannelResponse> future = SettableFuture.create();
+ channelzService.getChannel(
+
GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(),
+ getStreamObserver(future));
+ Channel channel;
+ try {
+ channel = future.get().getChannel();
+ } catch (Exception e) {
+ String msg = "Failed to get Channel: " + channelRef;
+ writer.println(msg + " Exception: " + e.getMessage());
+ return;
+ }
+ appendChannel(channel, writer, visitedSets);
+ }
+
+ private void appendChannel(Channel channel, PrintWriter writer, VisitedSets
visitedSets) {
+ writer.println("<table border='1'>");
+ writer.println("<tr>");
+ writer.println("<td>");
+ writer.println("ChannelId: " + channel.getRef().getChannelId());
+ writer.println("</td>");
+ writer.println("<td><pre>" + channel);
+ writer.println("</pre></td>");
+ writer.println("</tr>");
+ appendChannels(channel.getChannelRefList(), writer, visitedSets);
+ appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets);
+ appendSockets(channel.getSocketRefList(), writer);
+ writer.println("</table>");
+ }
+
+ private void appendSubChannels(
+ List<SubchannelRef> subchannelRefList, PrintWriter writer, VisitedSets
visitedSets) {
+ for (SubchannelRef subchannelRef : subchannelRefList) {
+ writer.println("<tr>");
+ writer.println("<td>");
+ writer.println("Sub Channel: " + subchannelRef.getSubchannelId());
+ writer.println("</td>");
+ writer.println("<td>");
+ appendSubchannel(subchannelRef, writer, visitedSets);
+ writer.println("</td>");
+ writer.println("</tr>");
+ }
+ }
+
+ private void appendSubchannel(
+ SubchannelRef subchannelRef, PrintWriter writer, VisitedSets
visitedSets) {
+ if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) {
+ String msg = "Duplicate Subchannel Id: " + subchannelRef;
+ writer.println(msg);
+ return;
+ }
+ visitedSets.subchannels.add(subchannelRef.getSubchannelId());
+ SettableFuture<GetSubchannelResponse> future = SettableFuture.create();
+ channelzService.getSubchannel(
+
GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(),
+ getStreamObserver(future));
+ Subchannel subchannel;
+ try {
+ subchannel = future.get().getSubchannel();
+ } catch (Exception e) {
+ String msg = "Failed to get Subchannel: " + subchannelRef;
+ writer.println(msg + " Exception: " + e.getMessage());
+ return;
+ }
+
+ writer.println("<table border='1'>");
+ writer.println("<tr>");
+ writer.println("<td>SubchannelId: " + subchannelRef.getSubchannelId());
+ writer.println("</td>");
+ writer.println("<td><pre>" + subchannel.toString());
+ writer.println("</pre></td>");
+ writer.println("</tr>");
+ appendChannels(subchannel.getChannelRefList(), writer, visitedSets);
+ appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets);
+ appendSockets(subchannel.getSocketRefList(), writer);
+ writer.println("</table>");
+ }
+
+ private void appendSockets(List<SocketRef> socketRefList, PrintWriter
writer) {
+ for (SocketRef socketRef : socketRefList) {
+ writer.println("<tr>");
+ writer.println("<td>");
+ writer.println("Socket: " + socketRef.getSocketId());
+ writer.println("</td>");
+ writer.println("<td>");
+ appendSocket(socketRef, writer);
+ writer.println("</td>");
+ writer.println("</tr>");
+ }
+ }
+
+ private void appendSocket(SocketRef socketRef, PrintWriter writer) {
+ SettableFuture<GetSocketResponse> future = SettableFuture.create();
+ channelzService.getSocket(
+
GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(),
+ getStreamObserver(future));
+ Socket socket;
+ try {
+ socket = future.get().getSocket();
+ } catch (Exception e) {
+ String msg = "Failed to get Socket: " + socketRef;
+ writer.println(msg + " Exception: " + e.getMessage());
+ return;
+ }
+ writer.println("<pre>" + socket + "</pre>");
+ }
+
+ private <T> StreamObserver<T> getStreamObserver(SettableFuture<T> future) {
+ return new StreamObserver<T>() {
+ @Nullable T response = null;
+
+ @Override
+ public void onNext(T message) {
+ response = message;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.setException(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ future.set(response);
+ }
+ };
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index aa15e0a5e1a..845d54588e7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -97,6 +97,10 @@ class GrpcDispatcherClient {
: randomlySelectNextStub(windmillServiceStubs));
}
+ ImmutableSet<HostAndPort> getDispatcherEndpoints() {
+ return dispatcherStubs.get().dispatcherEndpoints();
+ }
+
CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs =
dispatcherStubs.get().windmillMetadataServiceStubs();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 858aeb15985..f94fc09ac53 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -267,6 +267,11 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
}
+ @Override
+ public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+ return dispatcherClient.getDispatcherEndpoints();
+ }
+
@Override
public boolean isReady() {
return dispatcherClient.hasInitializedEndpoints();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
index cf31436d364..d8e4c064e97 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java
@@ -37,6 +37,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPor
public final class WindmillChannelFactory {
public static final String LOCALHOST = "localhost";
private static final int DEFAULT_GRPC_PORT = 443;
+ private static final int MAX_REMOTE_TRACE_EVENTS = 100;
private WindmillChannelFactory() {}
@@ -139,6 +140,7 @@ public final class WindmillChannelFactory {
return channelBuilder
.maxInboundMessageSize(Integer.MAX_VALUE)
+ .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
.maxInboundMetadataSize(1024 * 1024);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 069fcac07c8..e4985193d1c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -64,6 +64,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Ge
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Duration;
@@ -73,7 +74,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** An in-memory Windmill server that offers provided work and data. */
-final class FakeWindmillServer extends WindmillServerStub {
+public class FakeWindmillServer extends WindmillServerStub {
private static final Logger LOG =
LoggerFactory.getLogger(FakeWindmillServer.class);
private final ResponseQueue<Windmill.GetWorkRequest,
Windmill.GetWorkResponse> workToOffer;
private final ResponseQueue<GetDataRequest, GetDataResponse> dataToOffer;
@@ -91,6 +92,9 @@ final class FakeWindmillServer extends WindmillServerStub {
private boolean dropStreamingCommits = false;
private final Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses;
+ @GuardedBy("this")
+ private ImmutableSet<HostAndPort> dispatcherEndpoints;
+
public FakeWindmillServer(
ErrorCollector errorCollector,
Function<String, Optional<ComputationState>> computationStateFetcher) {
@@ -475,8 +479,18 @@ final class FakeWindmillServer extends WindmillServerStub {
}
@Override
- public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws
IOException {
- isReady = true;
+ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+ synchronized (this) {
+ this.dispatcherEndpoints = ImmutableSet.copyOf(endpoints);
+ isReady = true;
+ }
+ }
+
+ @Override
+ public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+ synchronized (this) {
+ return dispatcherEndpoints;
+ }
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
new file mode 100644
index 00000000000..3ec951d9c14
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
+import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelzServletTest {
+
+ @Test
+ public void testRendersAllChannels() throws UnsupportedEncodingException {
+ String windmill1 = "WindmillHost1";
+ String windmill2 = "WindmillHost2";
+ String nonWindmill1 = "NonWindmillHost1";
+ String someOtherHost1 = "SomeOtherHost2";
+ ManagedChannel[] unusedChannels =
+ new ManagedChannel[] {
+ InProcessChannelBuilder.forName(windmill1).build(),
+ InProcessChannelBuilder.forName(windmill2).build(),
+ InProcessChannelBuilder.forName(nonWindmill1).build(),
+ InProcessChannelBuilder.forName(someOtherHost1).build()
+ };
+ StreamingDataflowWorkerOptions options =
+
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+ FakeWindmillServer fakeWindmillServer =
+ new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty());
+ fakeWindmillServer.setWindmillServiceEndpoints(
+ ImmutableSet.of(HostAndPort.fromHost(windmill1),
HostAndPort.fromHost(windmill2)));
+ options.setChannelzShowOnlyWindmillServiceChannels(false);
+ ChannelzServlet channelzServlet = new ChannelzServlet("/channelz",
options, fakeWindmillServer);
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter writer = new PrintWriter(stringWriter);
+ channelzServlet.captureData(writer);
+ writer.flush();
+ String channelzData = stringWriter.toString();
+ assertTrue(channelzData.contains(windmill1));
+ assertTrue(channelzData.contains(windmill2));
+ assertTrue(channelzData.contains(nonWindmill1));
+ assertTrue(channelzData.contains(someOtherHost1));
+ }
+
+ @Test
+ public void testRendersOnlyWindmillChannels() throws
UnsupportedEncodingException {
+ String windmill1 = "WindmillHost1";
+ String windmill2 = "WindmillHost2";
+ String nonWindmill1 = "NonWindmillHost1";
+ String someOtherHost1 = "SomeOtherHost2";
+ ManagedChannel[] unusedChannels =
+ new ManagedChannel[] {
+ InProcessChannelBuilder.forName(windmill1).build(),
+ InProcessChannelBuilder.forName(windmill2).build(),
+ InProcessChannelBuilder.forName(nonWindmill1).build(),
+ InProcessChannelBuilder.forName(someOtherHost1).build()
+ };
+ StreamingDataflowWorkerOptions options =
+
PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class);
+ FakeWindmillServer fakeWindmillServer =
+ new FakeWindmillServer(new ErrorCollector(), s -> Optional.empty());
+ fakeWindmillServer.setWindmillServiceEndpoints(
+ ImmutableSet.of(HostAndPort.fromHost(windmill1),
HostAndPort.fromHost(windmill2)));
+ options.setChannelzShowOnlyWindmillServiceChannels(true);
+ ChannelzServlet channelzServlet = new ChannelzServlet("/channelz",
options, fakeWindmillServer);
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter writer = new PrintWriter(stringWriter);
+ channelzServlet.captureData(writer);
+ writer.flush();
+ String channelzData = stringWriter.toString();
+ assertTrue(channelzData.contains(windmill1));
+ assertTrue(channelzData.contains(windmill2));
+ // The logic does a substring match on the target
+ // NonWindmillHost1 matches since it contains WindmillHost1 which is a
windmill host
+ assertTrue(channelzData.contains(nonWindmill1));
+ assertFalse(channelzData.contains(someOtherHost1));
+ }
+}