scwhittle commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1802726168
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
Review Comment:
think this can be removed after removing get()
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
- Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness
never started.");
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
Review Comment:
should this shutdown all the stream senders (perhaps could call
closeStaleStreams(emptyBackends))
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -56,10 +56,11 @@ public interface WindmillStream {
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
- void adjustBudget(long itemsDelta, long bytesDelta);
+ void setBudget(long newItems, long newBytes);
- /** Returns the remaining in-flight {@link GetWorkBudget}. */
- GetWorkBudget remainingBudget();
+ default void setBudget(GetWorkBudget newBudget) {
Review Comment:
should this be the virtual method and the long variant defaulted to making a
budget?
The implementation makes a budget object
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
- Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness
never started.");
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
Review Comment:
seems like we should abort/close the stream below if it doesn't terminate
within some amount of time after half-closing.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
Review Comment:
should we also shutdown the windmillStreamManager? (after possibly closing
streams below)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java:
##########
@@ -61,38 +43,17 @@ public <T extends GetWorkBudgetSpender> void
distributeBudget(
return;
}
- Map<T, GetWorkBudget> desiredBudgets = computeDesiredBudgets(budgetOwners,
getWorkBudget);
-
- for (Entry<T, GetWorkBudget> streamAndDesiredBudget :
desiredBudgets.entrySet()) {
- GetWorkBudgetSpender getWorkBudgetSpender =
streamAndDesiredBudget.getKey();
- GetWorkBudget desired = streamAndDesiredBudget.getValue();
- GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget();
- if (isBelowFiftyPercentOfTarget(remaining, desired)) {
- GetWorkBudget adjustment = desired.subtract(remaining);
- getWorkBudgetSpender.adjustBudget(adjustment);
- }
- }
+ GetWorkBudget budgetPerStream = computeDesiredBudgets(budgetSpenders,
getWorkBudget);
+ budgetSpenders.forEach(getWorkBudgetSpender ->
getWorkBudgetSpender.setBudget(budgetPerStream));
}
- private <T extends GetWorkBudgetSpender> ImmutableMap<T, GetWorkBudget>
computeDesiredBudgets(
+ private <T extends GetWorkBudgetSpender> GetWorkBudget computeDesiredBudgets(
ImmutableCollection<T> streams, GetWorkBudget totalGetWorkBudget) {
- GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get();
- LOG.info("Current active work budget: {}", activeWorkBudget);
// TODO: Fix possibly non-deterministic handing out of budgets.
Review Comment:
remove, doesn't seem like the same if we are not doing deltas
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -67,7 +64,8 @@ public static ManagedChannel remoteChannel(
windmillServiceRpcChannelTimeoutSec);
default:
throw new UnsupportedOperationException(
- "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS
are supported WindmillServiceAddresses.");
+ "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS
are supported"
Review Comment:
remove IPV6 from comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
- Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness
never started.");
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
- StreamingEngineConnectionState newConnectionsState =
- StreamingEngineConnectionState.builder()
- .setWindmillConnections(newWindmillConnections)
- .setWindmillStreams(
-
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ activeMetadataVersion,
+ newWindmillEndpoints.version());
+ closeStaleStreams(newWindmillEndpoints);
+ ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
+ StreamingEngineBackends newBackends =
+ StreamingEngineBackends.builder()
+ .setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
+ backends.set(newBackends);
+ getWorkBudgetDistributor.distributeBudget(newStreams.values(),
totalGetWorkBudget);
+ activeMetadataVersion = newWindmillEndpoints.version();
+ }
+
+ /** Close the streams that are no longer valid asynchronously. */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
+ StreamingEngineBackends currentBackends = backends.get();
+ ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
Review Comment:
remove the variable and just do currentBackends.windmillStreams().entrySet()
... below
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -267,25 +277,100 @@ protected void startThrottleTimer() {
}
@Override
- public void adjustBudget(long itemsDelta, long bytesDelta) {
- GetWorkBudget adjustment =
- nextBudgetAdjustment
- // Get the current value, and reset the nextBudgetAdjustment. This
will be set again
- // when adjustBudget is called.
- .getAndUpdate(unused -> GetWorkBudget.noBudget())
- .apply(itemsDelta, bytesDelta);
- sendRequestExtension(adjustment);
+ public void setBudget(long newItems, long newBytes) {
+ GetWorkBudget currentMaxGetWorkBudget =
+ maxGetWorkBudget.updateAndGet(
+ ignored ->
GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
+ GetWorkBudget extension =
budgetTracker.computeBudgetExtension(currentMaxGetWorkBudget);
+ maybeSendRequestExtension(extension);
}
- @Override
- public GetWorkBudget remainingBudget() {
- // Snapshot the current budgets.
- GetWorkBudget currentPendingResponseBudget = pendingResponseBudget.get();
- GetWorkBudget currentNextBudgetAdjustment = nextBudgetAdjustment.get();
- GetWorkBudget currentInflightBudget = inFlightBudget.get();
-
- return currentPendingResponseBudget
- .apply(currentNextBudgetAdjustment)
- .apply(currentInflightBudget);
+ /**
+ * Tracks sent and received GetWorkBudget and uses this information to
generate request
+ * extensions.
+ */
+ @AutoValue
+ abstract static class GetWorkBudgetTracker {
+
+ private static GetWorkBudgetTracker create() {
+ return new AutoValue_GrpcDirectGetWorkStream_GetWorkBudgetTracker(
+ new AtomicLong(), new AtomicLong(), new AtomicLong(), new
AtomicLong());
+ }
+
+ abstract AtomicLong itemsRequested();
Review Comment:
how about just using synchronized instead of lots of separate atomics?
Multiple atomic ops might be worse performance anyway and it means we might
have weird races where they are inconsistently updated.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -110,19 +113,24 @@ private GrpcDirectGetWorkStream(
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
- this.request = request;
+ this.requestHeader = requestHeader;
this.getWorkThrottleTimer = getWorkThrottleTimer;
this.workItemScheduler = workItemScheduler;
this.workItemAssemblers = new ConcurrentHashMap<>();
- this.heartbeatSender = Suppliers.memoize(heartbeatSender::get);
- this.workCommitter = Suppliers.memoize(workCommitter::get);
- this.getDataClient = Suppliers.memoize(getDataClient::get);
- this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
- this.nextBudgetAdjustment = new
AtomicReference<>(GetWorkBudget.noBudget());
- this.pendingResponseBudget = new
AtomicReference<>(GetWorkBudget.noBudget());
+ this.heartbeatSender = heartbeatSender;
+ this.workCommitter = workCommitter;
+ this.getDataClient = getDataClient;
+ this.maxGetWorkBudget =
Review Comment:
can this max be moved into the tracker and synchronized within it?
As is there are races between setting and using this when calling into the
tracker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]