m-trieu commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1803918899
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
- Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness
never started.");
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
Review Comment:
Done
Used shutdown instead, and then used awaitTermination
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
- Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness
cannot start twice.");
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return backends.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
- .filter(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() !=
WindmillServiceAddress.Kind.IPV6)
- .map(
- windmillServiceAddress ->
- windmillServiceAddress.getKind() ==
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
- ? windmillServiceAddress.gcpServiceAddress()
- :
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+ .map(WindmillServiceAddress::getServiceAddress)
.collect(toImmutableSet());
}
/**
- * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or
defaults to {@link
- * GetDataStream} pointing to dispatcher.
+ * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+ * NoSuchElementException} if one is not found.
*/
private GetDataStream getGlobalDataStream(String globalDataKey) {
- return
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
- .map(Supplier::get)
- .orElseGet(
- () ->
- streamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
+ return
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+ .map(GlobalDataStreamSender::get)
+ .orElseThrow(
+ () -> new NoSuchElementException("No endpoint for global data tag:
" + globalDataKey));
}
@VisibleForTesting
@Override
public synchronized void shutdown() {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]