Repository: beam Updated Branches: refs/heads/DSL_SQL fcc80ce84 -> 4c5b7584a
[BEAM-1347] Add additional logging Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7905def3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7905def3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7905def3 Branch: refs/heads/DSL_SQL Commit: 7905def326a81e1830a0cbb3bcce0b304a2f9878 Parents: bf2d300 Author: Luke Cwik <[email protected]> Authored: Mon Jun 5 15:03:50 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon Jun 5 15:03:50 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/fn/harness/control/BeamFnControlClient.java | 3 ++- .../org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java | 5 ++++- .../apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7905def3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index e40bb2f..1c4d277 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -89,7 +89,7 @@ public class BeamFnControlClient { private class InboundObserver implements StreamObserver<BeamFnApi.InstructionRequest> { @Override public void onNext(BeamFnApi.InstructionRequest value) { - LOG.info("InstructionRequest received {}", value); + LOG.debug("Received InstructionRequest {}", value); Uninterruptibles.putUninterruptibly(bufferedInstructions, value); } @@ -155,6 +155,7 @@ public class BeamFnControlClient { } public void sendInstructionResponse(BeamFnApi.InstructionResponse value) { + LOG.debug("Sending InstructionResponse {}", value); outboundObserver.onNext(value); } http://git-wip-us.apache.org/repos/asf/beam/blob/7905def3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java index 4137cd7..8351626 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java @@ -78,7 +78,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { KV<String, BeamFnApi.Target> inputLocation, Coder<WindowedValue<T>> coder, ThrowingConsumer<WindowedValue<T>> consumer) { - LOG.debug("Registering consumer instruction {} for target {}", + LOG.debug("Registering consumer for instruction {} and target {}", inputLocation.getKey(), inputLocation.getValue()); @@ -106,6 +106,9 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient { Coder<WindowedValue<T>> coder) { BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); + LOG.debug("Creating output consumer for instruction {} and target {}", + outputLocation.getKey(), + outputLocation.getValue()); return new BeamFnDataBufferingOutboundObserver<>( options, outputLocation, coder, client.getOutboundObserver()); } http://git-wip-us.apache.org/repos/asf/beam/blob/7905def3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java index 15e8c0d..8ee5491 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java @@ -104,7 +104,8 @@ public class BeamFnDataGrpcMultiplexer { KV.of(data.getInstructionReference(), data.getTarget()); CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = futureForKey(key); if (!consumer.isDone()) { - LOG.debug("Received data for key {} without consumer ready.", key); + LOG.debug("Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", key); } consumer.get().accept(data); if (data.getData().isEmpty()) {
