tgroh closed pull request #4127: [BEAM-2899] Add a Logging Service to
FnExecution
URL: https://github.com/apache/beam/pull/4127
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 132d366f707..a2d0eb47942 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -636,9 +636,9 @@ message LogEntry {
// free not to use all severity levels in their log messages.
message Severity {
enum Enum {
+ // Unspecified level information. Will be logged at the TRACE level.
UNSPECIFIED = 0;
- // Trace level information, also the default log level unless
- // another severity is specified.
+ // Trace level information.
TRACE = 1;
// Debugging information.
DEBUG = 2;
diff --git a/runners/java-fn-execution/pom.xml
b/runners/java-fn-execution/pom.xml
index 3ebcfd0c8d9..f275d69207e 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -79,6 +79,11 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
@@ -98,9 +103,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
index 9ea0fce4636..547475c4be6 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
@@ -21,4 +21,15 @@
import io.grpc.BindableService;
/** An interface sharing common behavior with services used during execution
of user Fns. */
-public interface FnService extends AutoCloseable, BindableService {}
+public interface FnService extends AutoCloseable, BindableService {
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>There should be no more calls to any service method by the time a call
to {@link #close()}
+ * begins. Specifically, this means that a {@link io.grpc.Server} that this
service is bound to
+ * should have completed a call to the {@link io.grpc.Server#shutdown()}
method, and all future
+ * incoming calls will be rejected.
+ */
+ @Override
+ void close() throws Exception;
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
new file mode 100644
index 00000000000..9fe4a5fa07c
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A {@link ServerFactory} which creates {@link Server servers} with the {@link
+ * InProcessServerBuilder}.
+ */
+public class InProcessServerFactory extends ServerFactory {
+ private static final AtomicInteger serviceNameUniqifier = new
AtomicInteger();
+
+ public static InProcessServerFactory create() {
+ return new InProcessServerFactory();
+ }
+
+ private InProcessServerFactory() {}
+
+ @Override
+ public Server allocatePortAndCreate(BindableService service,
ApiServiceDescriptor.Builder builder)
+ throws IOException {
+ String name = String.format("InProcessServer_%s",
serviceNameUniqifier.getAndIncrement());
+ builder.setUrl(name);
+ return InProcessServerBuilder.forName(name).addService(service).build();
+ }
+
+ @Override
+ public Server create(
+ BindableService service, ApiServiceDescriptor serviceDescriptor) throws
IOException {
+ return
InProcessServerBuilder.forName(serviceDescriptor.getUrl()).addService(service).build();
+ }
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
new file mode 100644
index 00000000000..37c1f56b59b
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import com.google.common.collect.ImmutableSet;
+import io.grpc.stub.StreamObserver;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogControl;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An implementation of the Beam Fn Logging Service over gRPC. */
+public class GrpcLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase
+ implements FnService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcLoggingService.class);
+
+ public static GrpcLoggingService forWriter(LogWriter writer) {
+ return new GrpcLoggingService(writer);
+ }
+
+ private final LogWriter logWriter;
+ private final ConcurrentMap<InboundObserver, StreamObserver<LogControl>>
connectedClients;
+
+ private GrpcLoggingService(LogWriter logWriter) {
+ this.logWriter = logWriter;
+ connectedClients = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ Set<InboundObserver> remainingClients =
ImmutableSet.copyOf(connectedClients.keySet());
+ if (!remainingClients.isEmpty()) {
+ LOGGER.info(
+ "{} Beam Fn Logging clients still connected during shutdown.",
remainingClients.size());
+
+ // Signal server shutting down to all remaining connected clients.
+ for (InboundObserver client : remainingClients) {
+ // We remove these from the connected clients map to prevent a race
between
+ // this close method and the InboundObserver calling a terminal method
on the
+ // StreamObserver. If we removed it, then we are responsible for the
terminal call.
+ completeIfNotNull(connectedClients.remove(client));
+ }
+ }
+ }
+
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ LOGGER.info("Beam Fn Logging client connected.");
+ InboundObserver inboundObserver = new InboundObserver();
+ connectedClients.put(inboundObserver, outboundObserver);
+ return inboundObserver;
+ }
+
+ private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl>
outboundObserver) {
+ if (outboundObserver != null) {
+ try {
+ outboundObserver.onCompleted();
+ } catch (RuntimeException ignored) {
+ // Completing outbound observer failed, ignoring failure and continuing
+ LOGGER.warn("Beam Fn Logging client failed to be complete.", ignored);
+ }
+ }
+ }
+
+ /**
+ * An inbound {@link StreamObserver} that forwards incoming messages to the
client logger.
+ *
+ * <p>Mutually hangs up on clients that have errored or completed.
+ */
+ private class InboundObserver implements
StreamObserver<BeamFnApi.LogEntry.List> {
+ @Override
+ public void onNext(BeamFnApi.LogEntry.List value) {
+ for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
+ logWriter.log(logEntry);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOGGER.warn("Logging client failed unexpectedly.", t);
+ // We remove these from the connected clients map to prevent a race
between
+ // the close method and this InboundObserver calling a terminal method
on the
+ // StreamObserver. If we removed it, then we are responsible for the
terminal call.
+ completeIfNotNull(connectedClients.remove(this));
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("Logging client hanged up.");
+ // We remove these from the connected clients map to prevent a race
between
+ // the close method and this InboundObserver calling a terminal method
on the
+ // StreamObserver. If we removed it, then we are responsible for the
terminal call.
+ completeIfNotNull(connectedClients.remove(this));
+ }
+ }
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
new file mode 100644
index 00000000000..7ec49d51d3a
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+
+/**
+ * A consumer of {@link BeamFnApi.LogEntry Beam Log Entries}.
+ */
+public interface LogWriter {
+ /**
+ * Write the contents of the Log Entry to some logging backend.
+ */
+ void log(BeamFnApi.LogEntry entry);
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
new file mode 100644
index 00000000000..c76e16bcba9
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link LogWriter} which uses an {@link Logger SLF4J Logger} as the
underlying log backend.
+ *
+ * <p>Ignores the {@code timestamp}, {@code instruction reference}, {@code
primitive transform
+ * reference}, and {@code thread} fields.
+ */
+public class Slf4jLogWriter implements LogWriter {
+ public static LogWriter getDefault() {
+ return new Slf4jLogWriter();
+ }
+
+ private Slf4jLogWriter() {}
+
+ @Override
+ public void log(LogEntry entry) {
+ Logger log;
+ String location = entry.getLogLocation();
+ if (location != null) {
+ log = LoggerFactory.getLogger(location);
+ } else {
+ // TODO: Provide a useful default
+ log = LoggerFactory.getLogger("RemoteLog");
+ }
+ String message = entry.getMessage();
+ String trace = entry.getTrace();
+ switch (entry.getSeverity()) {
+ case ERROR:
+ case CRITICAL:
+ if (trace == null) {
+ log.error(message);
+ } else {
+ log.error("{} {}", message, trace);
+ }
+ break;
+ case WARN:
+ if (trace == null) {
+ log.warn(message);
+ } else {
+ log.warn("{} {}", message, trace);
+ }
+ break;
+ case INFO:
+ case NOTICE:
+ log.info(message);
+ break;
+ case DEBUG:
+ log.debug(message);
+ break;
+ case UNSPECIFIED:
+ case TRACE:
+ log.trace(message);
+ break;
+ default:
+ log.warn("Unknown message severity {}", entry.getSeverity());
+ log.info(message);
+ break;
+ }
+ }
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
new file mode 100644
index 00000000000..494f276f43d
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes used to log informational messages over the {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc Beam Fn Logging
Service}.
+ */
+package org.apache.beam.runners.fnexecution.logging;
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
new file mode 100644
index 00000000000..c267a2a08ae
--- /dev/null
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogControl;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GrpcLoggingService}. */
+@RunWith(JUnit4.class)
+public class GrpcLoggingServiceTest {
+ private Consumer<LogControl> messageDiscarder = new Consumer<LogControl>() {
+ @Override
+ public void accept(LogControl item) {
+ // Ignore
+ }
+ };
+
+ @Test
+ public void testMultipleClientsSuccessfullyProcessed() throws Exception {
+ ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new
ConcurrentLinkedQueue<>();
+ GrpcLoggingService service =
+ GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+ try (GrpcFnServer<GrpcLoggingService> server =
+ GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
+
+ Collection<Callable<Void>> tasks = new ArrayList<>();
+ for (int i = 1; i <= 3; ++i) {
+ final int instructionReference = i;
+ tasks.add(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ CountDownLatch waitForServerHangup = new CountDownLatch(1);
+ String url = server.getApiServiceDescriptor().getUrl();
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(url)
+ .build();
+ StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
+ BeamFnLoggingGrpc.newStub(channel)
+ .logging(
+ TestStreams.withOnNext(messageDiscarder)
+ .withOnCompleted(new
CountDown(waitForServerHangup))
+ .build());
+ outboundObserver.onNext(
+ createLogsWithIds(instructionReference,
-instructionReference));
+ outboundObserver.onCompleted();
+ waitForServerHangup.await();
+ return null;
+ }
+ });
+ }
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.invokeAll(tasks);
+ assertThat(logs,
+ containsInAnyOrder(createLogWithId(1L), createLogWithId(2L),
createLogWithId(3L),
+ createLogWithId(-1L), createLogWithId(-2L),
createLogWithId(-3L)));
+ }
+ }
+
+ @Test
+ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws
Exception {
+ ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new
ConcurrentLinkedQueue<>();
+ GrpcLoggingService service =
+ GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+ try (GrpcFnServer<GrpcLoggingService> server =
+ GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
+
+ Collection<Callable<Void>> tasks = new ArrayList<>();
+ for (int i = 1; i <= 3; ++i) {
+ final int instructionReference = i;
+ tasks.add(
+ new Callable<Void>() {
+ public Void call() throws Exception {
+ CountDownLatch waitForTermination = new CountDownLatch(1);
+ ManagedChannel channel =
+
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
+ .build();
+ StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
+ BeamFnLoggingGrpc.newStub(channel)
+ .logging(
+ TestStreams.withOnNext(messageDiscarder)
+ .withOnError(new CountDown(waitForTermination))
+ .build());
+ outboundObserver.onNext(
+ createLogsWithIds(instructionReference,
-instructionReference));
+ outboundObserver.onError(new RuntimeException("Client " +
instructionReference));
+ waitForTermination.await();
+ return null;
+ }
+ });
+ }
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.invokeAll(tasks);
+ }
+ }
+
+ @Test
+ public void testServerCloseHangsUpClients() throws Exception {
+ LinkedBlockingQueue<LogEntry> logs = new LinkedBlockingQueue<>();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ Collection<Future<Void>> futures = new ArrayList<>();
+ final GrpcLoggingService service =
+ GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+ try (GrpcFnServer<GrpcLoggingService> server =
+ GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
+
+ for (int i = 1; i <= 3; ++i) {
+ final long instructionReference = i;
+ futures.add(
+ executorService.submit(
+ new Callable<Void>() {
+ public Void call() throws Exception {
+ {
+ CountDownLatch waitForServerHangup = new
CountDownLatch(1);
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(
+ server.getApiServiceDescriptor().getUrl())
+ .build();
+ StreamObserver<BeamFnApi.LogEntry.List> outboundObserver
=
+ BeamFnLoggingGrpc.newStub(channel)
+ .logging(
+ TestStreams.withOnNext(messageDiscarder)
+ .withOnCompleted(new
CountDown(waitForServerHangup))
+ .build());
+
outboundObserver.onNext(createLogsWithIds(instructionReference));
+ waitForServerHangup.await();
+ return null;
+ }
+ }
+ }));
+ }
+ // Wait till each client has sent their message showing that they have
connected.
+ for (int i = 1; i <= 3; ++i) {
+ logs.take();
+ }
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ }
+
+ private BeamFnApi.LogEntry.List createLogsWithIds(long... ids) {
+ BeamFnApi.LogEntry.List.Builder builder =
BeamFnApi.LogEntry.List.newBuilder();
+ for (long id : ids) {
+ builder.addLogEntries(createLogWithId(id));
+ }
+ return builder.build();
+ }
+ private BeamFnApi.LogEntry createLogWithId(long id) {
+ return
BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.toString(id)).build();
+ }
+
+ private static class CollectionAppendingLogWriter implements LogWriter {
+ private final Collection<BeamFnApi.LogEntry> entries;
+
+ private CollectionAppendingLogWriter(Collection<LogEntry> entries) {
+ this.entries = entries;
+ }
+
+ @Override
+ public void log(LogEntry entry) {
+ entries.add(entry);
+ }
+ }
+
+ /**
+ * A {@link Runnable} that calls {@link CountDownLatch#countDown()} on a
{@link CountDownLatch}.
+ */
+ private static class CountDown implements Runnable {
+ private final CountDownLatch latch;
+
+ CountDown(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ }
+}
diff --git
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
index 5df505b1f24..b07ea54e451 100644
---
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
+++
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
@@ -30,7 +30,7 @@
public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
return new Builder<>(new ForwardingCallStreamObserver<>(
onNext,
- TestStreams.<Throwable>noopConsumer(),
+ TestStreams.throwingErrorHandler(),
TestStreams.noopRunnable(),
TestStreams.alwaysTrueSupplier()));
}
@@ -97,6 +97,15 @@ public void accept(Throwable t) {
}
}
+ private static Consumer<Throwable> throwingErrorHandler() {
+ return new Consumer<Throwable>() {
+ @Override
+ public void accept(Throwable item) {
+ throw new RuntimeException(item);
+ }
+ };
+ }
+
private static void noop() {
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services