http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java new file mode 100644 index 0000000..14e26f0 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.control; + +import com.google.protobuf.Message; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A handler and datastore for types that be can be registered via the Fn API. + * + * <p>Allows for {@link org.apache.beam.fn.v1.BeamFnApi.RegisterRequest}s to occur in parallel with + * subsequent requests that may lookup registered values by blocking lookups until registration + * occurs. + */ +public class RegisterHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(RegisterHandler.class); + private final ConcurrentMap<Long, CompletableFuture<Message>> idToObject; + + public RegisterHandler() { + idToObject = new ConcurrentHashMap<>(); + } + + public <T extends Message> T getById(long id) { + try { + @SuppressWarnings("unchecked") + CompletableFuture<T> returnValue = (CompletableFuture<T>) computeIfAbsent(id); + /* + * TODO: Even though the register request instruction occurs before the process bundle + * instruction in the control stream, the instructions are being processed in parallel + * in the Java harness causing a data race which is why we use a future. This will block + * forever in the case of a runner having a bug sending the wrong ids. Instead of blocking + * forever, we could do a timed wait or come up with another way of ordering the instruction + * processing to remove the data race. + */ + return returnValue.get(); + } catch (ExecutionException e) { + throw new RuntimeException(String.format("Failed to load %s", id), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(String.format("Failed to load %s", id), e); + } + } + + public BeamFnApi.InstructionResponse.Builder register(BeamFnApi.InstructionRequest request) { + BeamFnApi.InstructionResponse.Builder response = BeamFnApi.InstructionResponse.newBuilder() + .setRegister(RegisterResponse.getDefaultInstance()); + + BeamFnApi.RegisterRequest registerRequest = request.getRegister(); + for (BeamFnApi.ProcessBundleDescriptor processBundleDescriptor + : registerRequest.getProcessBundleDescriptorList()) { + LOGGER.debug("Registering {} with type {}", + processBundleDescriptor.getId(), + processBundleDescriptor.getClass()); + computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor); + for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) { + LOGGER.debug("Registering {} with type {}", + coder.getFunctionSpec().getId(), + coder.getClass()); + computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder); + } + } + + return response; + } + + private CompletableFuture<Message> computeIfAbsent(long id) { + return idToObject.computeIfAbsent(id, (Long ignored) -> new CompletableFuture<>()); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java new file mode 100644 index 0000000..6535555 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Control service client and individual request handlers. + */ +package org.apache.beam.fn.harness.control; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java new file mode 100644 index 0000000..3bf44ab --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A buffering outbound {@link Consumer} for the Beam Fn Data API. + * + * <p>Encodes individually consumed elements with the provided {@link Coder} producing + * a single {@link org.apache.beam.fn.v1.BeamFnApi.Elements} message when the buffer threshold + * is surpassed. + * + * <p>The default buffer threshold can be overridden by specifying the experiment + * {@code beam_fn_api_data_buffer_limit=<bytes>} + * + * <p>TODO: Handle outputting large elements (> 2GiBs). Note that this also applies to the + * input side as well. + * + * <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as + * a marker, detect on the input side that no bytes were read and force reading a single byte. + */ +public class BeamFnDataBufferingOutboundObserver<T> + implements CloseableThrowingConsumer<WindowedValue<T>> { + private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit="; + private static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000; + private static final Logger LOGGER = + LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class); + + private long byteCounter; + private long counter; + private final int bufferLimit; + private final Coder<WindowedValue<T>> coder; + private final KV<Long, BeamFnApi.Target> outputLocation; + private final StreamObserver<BeamFnApi.Elements> outboundObserver; + private final ByteString.Output bufferedElements; + + public BeamFnDataBufferingOutboundObserver( + PipelineOptions options, + KV<Long, BeamFnApi.Target> outputLocation, + Coder<WindowedValue<T>> coder, + StreamObserver<BeamFnApi.Elements> outboundObserver) { + this.bufferLimit = getBufferLimit(options); + this.outputLocation = outputLocation; + this.coder = coder; + this.outboundObserver = outboundObserver; + this.bufferedElements = ByteString.newOutput(); + } + + /** + * Returns the {@code beam_fn_api_data_buffer_limit=<int>} experiment value if set. Otherwise + * returns the default buffer limit. + */ + private static int getBufferLimit(PipelineOptions options) { + List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments(); + for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) { + if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) { + return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length())); + } + } + return DEFAULT_BUFFER_LIMIT_BYTES; + } + + @Override + public void close() throws Exception { + BeamFnApi.Elements.Builder elements = convertBufferForTransmission(); + // This will add an empty data block representing the end of stream. + elements.addDataBuilder() + .setInstructionReference(outputLocation.getKey()) + .setTarget(outputLocation.getValue()); + + LOGGER.debug("Closing stream for instruction {} and " + + "target {} having transmitted {} values {} bytes", + outputLocation.getKey(), + outputLocation.getValue(), + counter, + byteCounter); + outboundObserver.onNext(elements.build()); + } + + @Override + public void accept(WindowedValue<T> t) throws IOException { + coder.encode(t, bufferedElements, Context.NESTED); + counter += 1; + if (bufferedElements.size() >= bufferLimit) { + outboundObserver.onNext(convertBufferForTransmission().build()); + } + } + + private BeamFnApi.Elements.Builder convertBufferForTransmission() { + BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder(); + if (bufferedElements.size() == 0) { + return elements; + } + + elements.addDataBuilder() + .setInstructionReference(outputLocation.getKey()) + .setTarget(outputLocation.getValue()) + .setData(bufferedElements.toByteString()); + + byteCounter += bufferedElements.size(); + bufferedElements.reset(); + return elements; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java new file mode 100644 index 0000000..27b1acb --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.data; + +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +/** + * The {@link BeamFnDataClient} is able to forward inbound elements to a consumer and is also a + * consumer of outbound elements. Callers can register themselves as consumers for inbound elements + * or can get a handle for a consumer for outbound elements. + */ +public interface BeamFnDataClient { + /** + * Registers the following inbound consumer for the provided instruction id and target. + * + * <p>The provided coder is used to decode inbound elements. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream, + * the returned future is completed successfully. + * + * <p>The consumer is not required to be thread safe. + */ + <T> CompletableFuture<Void> forInboundConsumer( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + KV<Long, BeamFnApi.Target> inputLocation, + Coder<WindowedValue<T>> coder, + ThrowingConsumer<WindowedValue<T>> consumer); + + /** + * Creates a closeable consumer using the provided instruction id and target. + * + * <p>The provided coder is used to encode elements on the outbound stream. + * + * <p>Closing the returned consumer signals the end of the stream. + * + * <p>The returned closeable consumer is not thread safe. + */ + <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + KV<Long, BeamFnApi.Target> outputLocation, + Coder<WindowedValue<T>> coder); +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java new file mode 100644 index 0000000..9bbdc78 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.data; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnDataGrpc; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that uses gRPC for sending and receiving data. + * + * <p>TODO: Handle closing clients that are currently not a consumer nor are being consumed. + */ +public class BeamFnDataGrpcClient implements BeamFnDataClient { + private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcClient.class); + + private final ConcurrentMap<BeamFnApi.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache; + private final Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory; + private final BiFunction<Function<StreamObserver<BeamFnApi.Elements>, + StreamObserver<BeamFnApi.Elements>>, + StreamObserver<BeamFnApi.Elements>, + StreamObserver<BeamFnApi.Elements>> streamObserverFactory; + private final PipelineOptions options; + + public BeamFnDataGrpcClient( + PipelineOptions options, + Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + BiFunction<Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>>, + StreamObserver<BeamFnApi.Elements>, + StreamObserver<BeamFnApi.Elements>> streamObserverFactory) { + this.options = options; + this.channelFactory = channelFactory; + this.streamObserverFactory = streamObserverFactory; + this.cache = new ConcurrentHashMap<>(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * <p>The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public <T> CompletableFuture<Void> forInboundConsumer( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + KV<Long, BeamFnApi.Target> inputLocation, + Coder<WindowedValue<T>> coder, + ThrowingConsumer<WindowedValue<T>> consumer) { + LOGGER.debug("Registering consumer instruction {} for target {}", + inputLocation.getKey(), + inputLocation.getValue()); + + CompletableFuture<Void> readFuture = new CompletableFuture<>(); + BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); + client.futureForKey(inputLocation).complete( + new BeamFnDataInboundObserver<>(coder, consumer, readFuture)); + return readFuture; + } + + /** + * Creates a closeable consumer using the provided instruction id and target. + * + * <p>The provided coder is used to encode elements on the outbound stream. + * + * <p>On closing the returned consumer, an empty data block is sent as a signal of the + * logical data stream finishing. + * + * <p>The returned closeable consumer is not thread safe. + */ + @Override + public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + KV<Long, BeamFnApi.Target> outputLocation, + Coder<WindowedValue<T>> coder) { + BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); + + return new BeamFnDataBufferingOutboundObserver<>( + options, outputLocation, coder, client.getOutboundObserver()); + } + + private BeamFnDataGrpcMultiplexer getClientFor( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor) { + return cache.computeIfAbsent(apiServiceDescriptor, + (BeamFnApi.ApiServiceDescriptor descriptor) -> new BeamFnDataGrpcMultiplexer( + descriptor, + (StreamObserver<BeamFnApi.Elements> inboundObserver) -> streamObserverFactory.apply( + BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data, + inboundObserver))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java new file mode 100644 index 0000000..ea059df --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A gRPC multiplexer for a specific {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor}. + * + * <p>Multiplexes data for inbound consumers based upon their individual + * {@link org.apache.beam.fn.v1.BeamFnApi.Target}s. + * + * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those + * streams. For inbound streams, this is as thread safe as the inbound observers. For outbound + * streams, this is as thread safe as the underlying stream observer. + * + * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying + * the output location with a specific outbound observer. + */ +public class BeamFnDataGrpcMultiplexer { + private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class); + private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final StreamObserver<BeamFnApi.Elements> inboundObserver; + private final StreamObserver<BeamFnApi.Elements> outboundObserver; + @VisibleForTesting + final ConcurrentMap<KV<Long, BeamFnApi.Target>, + CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> consumers; + + public BeamFnDataGrpcMultiplexer( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Function<StreamObserver<BeamFnApi.Elements>, + StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) { + this.apiServiceDescriptor = apiServiceDescriptor; + this.consumers = new ConcurrentHashMap<>(); + this.inboundObserver = new InboundObserver(); + this.outboundObserver = outboundObserverFactory.apply(inboundObserver); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("apiServiceDescriptor", apiServiceDescriptor) + .add("consumers", consumers) + .toString(); + } + + public StreamObserver<BeamFnApi.Elements> getInboundObserver() { + return inboundObserver; + } + + public StreamObserver<BeamFnApi.Elements> getOutboundObserver() { + return outboundObserver; + } + + public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey( + KV<Long, BeamFnApi.Target> key) { + return consumers.computeIfAbsent( + key, + (KV<Long, BeamFnApi.Target> providedKey) -> new CompletableFuture<>()); + } + + /** + * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass + * the elements to. + * + * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the + * sending harness to initiate transmitting data without needing for the receiving harness to + * signal that it is ready to consume that data. + */ + private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> { + @Override + public void onNext(BeamFnApi.Elements value) { + for (BeamFnApi.Elements.Data data : value.getDataList()) { + try { + KV<Long, BeamFnApi.Target> key = + KV.of(data.getInstructionReference(), data.getTarget()); + futureForKey(key).get().accept(data); + if (data.getData().isEmpty()) { + consumers.remove(key); + } + /* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { + LOGGER.error( + "Client interrupted during handling of data for instruction {} and target {}", + data.getInstructionReference(), + data.getTarget(), + e); + outboundObserver.onError(e); + } catch (RuntimeException e) { + LOGGER.error( + "Client failed to handle data for instruction {} and target {}", + data.getInstructionReference(), + data.getTarget(), + e); + outboundObserver.onError(e); + } + } + } + + @Override + public void onError(Throwable t) { + LOGGER.error("Failed to handle for {}", apiServiceDescriptor, t); + } + + @Override + public void onCompleted() { + LOGGER.warn("Hanged up for {}.", apiServiceDescriptor); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java new file mode 100644 index 0000000..f8b5ab8 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Decodes individually consumed {@link org.apache.beam.fn.v1.BeamFnApi.Elements.Data} with the + * provided {@link Coder} passing the individual decoded elements to the provided consumer. + */ +public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements.Data> { + private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataInboundObserver.class); + private final ThrowingConsumer<WindowedValue<T>> consumer; + private final Coder<WindowedValue<T>> coder; + private final CompletableFuture<Void> readFuture; + private long byteCounter; + private long counter; + + public BeamFnDataInboundObserver( + Coder<WindowedValue<T>> coder, + ThrowingConsumer<WindowedValue<T>> consumer, + CompletableFuture<Void> readFuture) { + this.coder = coder; + this.consumer = consumer; + this.readFuture = readFuture; + } + + @Override + public void accept(BeamFnApi.Elements.Data t) { + if (readFuture.isDone()) { + // Drop any incoming data if the stream processing has finished. + return; + } + try { + if (t.getData().isEmpty()) { + LOGGER.debug("Closing stream for instruction {} and " + + "target {} having consumed {} values {} bytes", + t.getInstructionReference(), + t.getTarget(), + counter, + byteCounter); + readFuture.complete(null); + return; + } + + byteCounter += t.getData().size(); + InputStream inputStream = t.getData().newInput(); + while (inputStream.available() > 0) { + counter += 1; + WindowedValue<T> value = coder.decode(inputStream, Context.NESTED); + consumer.accept(value); + } + } catch (Exception e) { + readFuture.completeExceptionally(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java new file mode 100644 index 0000000..edaaa65 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Data service client and logical stream multiplexing. + */ +package org.apache.beam.fn.harness.data; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java new file mode 100644 index 0000000..b3b7f48 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fake; + +import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time. + * The factory returns {@link Aggregator}s that do nothing when a value is added. + */ +public class FakeAggregatorFactory implements AggregatorFactory { + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, + StepContext stepContext, + String aggregatorName, + CombineFn<InputT, AccumT, OutputT> combine) { + return new Aggregator<InputT, OutputT>() { + @Override + public void addValue(InputT value) {} + + @Override + public String getName() { + return aggregatorName; + } + + @Override + public CombineFn<InputT, ?, OutputT> getCombineFn() { + return combine; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java new file mode 100644 index 0000000..84da059 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fake; + +import java.io.IOException; +import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +/** + * A fake {@link StepContext} factory that performs no-ops. + */ +public class FakeStepContext implements StepContext { + @Override + public String getStepName() { + return "TODO"; + } + + @Override + public String getTransformName() { + return "TODO"; + } + + @Override + public void noteOutput(WindowedValue<?> output) { + } + + @Override + public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) throws IOException { + } + + @Override + public StateInternals<?> stateInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java new file mode 100644 index 0000000..cd6eb02 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Fake implementations of bindings used with runners-core. + */ +package org.apache.beam.fn.harness.fake; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java new file mode 100644 index 0000000..59ab149 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fn; + +/** A {@link ThrowingConsumer} that can be closed. */ +public interface CloseableThrowingConsumer<T> extends AutoCloseable, ThrowingConsumer<T> { +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java new file mode 100644 index 0000000..9d505da --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fn; + +import java.util.function.BiFunction; + +/** + * A {@link BiFunction} which can throw {@link Exception}s. + * + * <p>Used to expand the allowed set of method references to be used by Java 8 + * functional interfaces. + */ +@FunctionalInterface +public interface ThrowingBiFunction<T1, T2, T3> { + T3 apply(T1 t1, T2 t2) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java new file mode 100644 index 0000000..b34e857 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fn; + +import java.util.function.Consumer; + +/** + * A {@link Consumer} which can throw {@link Exception}s. + * + * <p>Used to expand the allowed set of method references to be used by Java 8 + * functional interfaces. + */ +@FunctionalInterface +public interface ThrowingConsumer<T> { + void accept(T t) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java new file mode 100644 index 0000000..446ff60 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fn; + +import java.util.function.Function; + +/** + * A {@link Function} which can throw {@link Exception}s. + * + * <p>Used to expand the allowed set of method references to be used by Java 8 + * functional interfaces. + */ +@FunctionalInterface +public interface ThrowingFunction<T1, T2> { + T2 apply(T1 value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java new file mode 100644 index 0000000..c7fc29e --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.fn; + +/** + * A {@link Runnable} which can throw {@link Exception}s. + * + * <p>Used to expand the allowed set of method references to be used by Java 8 + * functional interfaces. + */ +@FunctionalInterface +public interface ThrowingRunnable { + void run() throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java new file mode 100644 index 0000000..bbf3396 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Java 8 functional interface extensions. + */ +package org.apache.beam.fn.harness.fn; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java new file mode 100644 index 0000000..d74d9fa --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.logging; + +import static com.google.common.base.Throwables.getStackTraceAsString; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Timestamp; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.logging.Formatter; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogManager; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnLoggingGrpc; +import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API. + */ +public class BeamFnLoggingClient implements AutoCloseable { + private static final String ROOT_LOGGER_NAME = ""; + private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> LOG_LEVEL_MAP = + ImmutableMap.<Level, BeamFnApi.LogEntry.Severity>builder() + .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR) + .put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN) + .put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO) + .put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG) + .put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE) + .build(); + + private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION = + ImmutableMap.<DataflowWorkerLoggingOptions.Level, Level>builder() + .put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF) + .put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE) + .put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING) + .put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO) + .put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE) + .put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST) + .build(); + + private static final Formatter FORMATTER = new SimpleFormatter(); + + /* Used to signal to a thread processing a queue to finish its work gracefully. */ + private static final BeamFnApi.LogEntry POISON_PILL = + BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.MIN_VALUE).build(); + + /** + * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB, + * this represents a buffer of about 10 MiBs. + */ + private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10_000; + + /* We need to store a reference to the configured loggers so that they are not + * garbage collected. java.util.logging only has weak references to the loggers + * so if they are garbage collected, our hierarchical configuration will be lost. */ + private final Collection<Logger> configuredLoggers; + private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final ManagedChannel channel; + private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver; + private final LogControlObserver inboundObserver; + private final LogRecordHandler logRecordHandler; + private final CompletableFuture<Object> inboundObserverCompletion; + + public BeamFnLoggingClient( + PipelineOptions options, + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory, + BiFunction<Function<StreamObserver<BeamFnApi.LogControl>, + StreamObserver<BeamFnApi.LogEntry.List>>, + StreamObserver<BeamFnApi.LogControl>, + StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) { + this.apiServiceDescriptor = apiServiceDescriptor; + this.inboundObserverCompletion = new CompletableFuture<>(); + this.configuredLoggers = new ArrayList<>(); + this.channel = channelFactory.apply(apiServiceDescriptor); + + // Reset the global log manager, get the root logger and remove the default log handlers. + LogManager logManager = LogManager.getLogManager(); + logManager.reset(); + Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME); + for (Handler handler : rootLogger.getHandlers()) { + rootLogger.removeHandler(handler); + } + + // Use the passed in logging options to configure the various logger levels. + DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class); + if (loggingOptions.getDefaultWorkerLogLevel() != null) { + rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultWorkerLogLevel())); + } + + if (loggingOptions.getWorkerLogLevelOverrides() != null) { + for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride : + loggingOptions.getWorkerLogLevelOverrides().entrySet()) { + Logger logger = Logger.getLogger(loggerOverride.getKey()); + logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue())); + configuredLoggers.add(logger); + } + } + + BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel); + inboundObserver = new LogControlObserver(); + logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService()); + logRecordHandler.setLevel(Level.ALL); + outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver); + rootLogger.addHandler(logRecordHandler); + } + + @Override + public void close() throws Exception { + // Hang up with the server + logRecordHandler.close(); + + // Wait for the server to hang up + inboundObserverCompletion.get(); + + // Reset the logging configuration to what it is at startup + for (Logger logger : configuredLoggers) { + logger.setLevel(null); + } + configuredLoggers.clear(); + LogManager.getLogManager().readConfiguration(); + + // Shut the channel down + channel.shutdown(); + if (!channel.awaitTermination(10, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(BeamFnLoggingClient.class) + .add("apiServiceDescriptor", apiServiceDescriptor) + .toString(); + } + + private class LogRecordHandler extends Handler implements Runnable { + private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries = + new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + private final Future<?> bufferedLogWriter; + private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler; + + private LogRecordHandler(ExecutorService executorService) { + bufferedLogWriter = executorService.submit(this); + logEntryHandler = new ThreadLocal<>(); + } + + @Override + public void publish(LogRecord record) { + BeamFnApi.LogEntry.Severity severity = LOG_LEVEL_MAP.get(record.getLevel()); + if (severity == null) { + return; + } + BeamFnApi.LogEntry.Builder builder = BeamFnApi.LogEntry.newBuilder() + .setSeverity(severity) + .setLogLocation(record.getLoggerName()) + .setMessage(FORMATTER.formatMessage(record)) + .setThread(Integer.toString(record.getThreadID())) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(record.getMillis() / 1000) + .setNanos((int) (record.getMillis() % 1000) * 1_000_000)); + if (record.getThrown() != null) { + builder.setTrace(getStackTraceAsString(record.getThrown())); + } + // The thread that sends log records should never perform a blocking publish and + // only insert log records best effort. We detect which thread is logging + // by using the thread local, defaulting to the blocking publish. + MoreObjects.firstNonNull( + logEntryHandler.get(), this::blockingPublish).accept(builder.build()); + } + + /** Blocks caller till enough space exists to publish this log entry. */ + private void blockingPublish(BeamFnApi.LogEntry logEntry) { + try { + bufferedLogEntries.put(logEntry); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void run() { + // Logging which occurs in this thread will attempt to publish log entries into the + // above handler which should never block if the queue is full otherwise + // this thread will get stuck. + logEntryHandler.set(bufferedLogEntries::offer); + List<BeamFnApi.LogEntry> additionalLogEntries = + new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + try { + BeamFnApi.LogEntry logEntry; + while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) { + BeamFnApi.LogEntry.List.Builder builder = + BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry); + bufferedLogEntries.drainTo(additionalLogEntries); + for (int i = 0; i < additionalLogEntries.size(); ++i) { + if (additionalLogEntries.get(i) == POISON_PILL) { + additionalLogEntries = additionalLogEntries.subList(0, i); + break; + } + } + builder.addAllLogEntries(additionalLogEntries); + outboundObserver.onNext(builder.build()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + @Override + public void flush() { + } + + @Override + public void close() { + synchronized (outboundObserver) { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (!bufferedLogWriter.isDone()) { + // We check to see if we were able to successfully insert the poison pill at the end of + // the queue forcing the remainder of the elements to be processed or if the processing + // thread is done. + try { + // The order of these checks is important because short circuiting will cause us to + // insert into the queue first and only if it fails do we check that the thread is done. + while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS) + || !bufferedLogWriter.isDone()) { + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + waitTillFinish(); + } + outboundObserver.onCompleted(); + } + } + + private void waitTillFinish() { + try { + bufferedLogWriter.get(); + } catch (CancellationException e) { + // Ignore cancellations + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> { + @Override + public void onNext(BeamFnApi.LogControl value) { + } + + @Override + public void onError(Throwable t) { + inboundObserverCompletion.completeExceptionally(t); + } + + @Override + public void onCompleted() { + inboundObserverCompletion.complete(null); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java new file mode 100644 index 0000000..7a4d0a8 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Logging service client and JUL logging handler adapter. + */ +package org.apache.beam.fn.harness.logging; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java new file mode 100644 index 0000000..58080e4 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Top level package for Beam Java Fn Harness. + */ +package org.apache.beam.fn.harness; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java new file mode 100644 index 0000000..2007139 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.stream; + +import java.util.concurrent.Phaser; + +/** + * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates + * after the first advancement. + */ +public final class AdvancingPhaser extends Phaser { + public AdvancingPhaser(int numParties) { + super(numParties); + } + + @Override + protected boolean onAdvance(int phase, int registeredParties) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java new file mode 100644 index 0000000..cda3a4b --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.stream; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing + * thread responsible for interacting with the underlying {@link CallStreamObserver}. + * + * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser} + * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers + * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver} + * becomes ready. + */ +@ThreadSafe +public final class BufferingStreamObserver<T> implements StreamObserver<T> { + private static final Object POISON_PILL = new Object(); + private final LinkedBlockingDeque<T> queue; + private final Phaser phaser; + private final CallStreamObserver<T> outboundObserver; + private final Future<?> queueDrainer; + private final int bufferSize; + + public BufferingStreamObserver( + Phaser phaser, + CallStreamObserver<T> outboundObserver, + ExecutorService executor, + int bufferSize) { + this.phaser = phaser; + this.bufferSize = bufferSize; + this.queue = new LinkedBlockingDeque<>(bufferSize); + this.outboundObserver = outboundObserver; + this.queueDrainer = executor.submit(this::drainQueue); + } + + private void drainQueue() { + try { + while (true) { + int currentPhase = phaser.getPhase(); + while (outboundObserver.isReady()) { + T value = queue.take(); + if (value != POISON_PILL) { + outboundObserver.onNext(value); + } else { + return; + } + } + phaser.awaitAdvance(currentPhase); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + @Override + public void onNext(T value) { + try { + // Attempt to add an element to the bounded queue occasionally checking to see + // if the queue drainer is still alive. + while (!queue.offer(value, 60, TimeUnit.SECONDS)) { + checkState(!queueDrainer.isDone(), "Stream observer has finished."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + synchronized (outboundObserver) { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (!queueDrainer.isDone()) { + // We check to see if we were able to successfully insert the poison pill at the front of + // the queue to cancel the processing thread eagerly or if the processing thread is done. + try { + // The order of these checks is important because short circuiting will cause us to + // insert into the queue first and only if it fails do we check that the thread is done. + while (!queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS) + || !queueDrainer.isDone()) { + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + waitTillFinish(); + } + outboundObserver.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (outboundObserver) { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (!queueDrainer.isDone()) { + // We check to see if we were able to successfully insert the poison pill at the end of + // the queue forcing the remainder of the elements to be processed or if the processing + // thread is done. + try { + // The order of these checks is important because short circuiting will cause us to + // insert into the queue first and only if it fails do we check that the thread is done. + while (!queue.offer((T) POISON_PILL, 60, TimeUnit.SECONDS) + || !queueDrainer.isDone()) { + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + waitTillFinish(); + } + outboundObserver.onCompleted(); + } + } + + @VisibleForTesting + public int getBufferSize() { + return bufferSize; + } + + private void waitTillFinish() { + try { + queueDrainer.get(); + } catch (CancellationException e) { + // Cancellation is expected + return; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java new file mode 100644 index 0000000..82a1aa4 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.stream; + +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.Phaser; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A {@link StreamObserver} which uses synchronization on the underlying + * {@link CallStreamObserver} to provide thread safety. + * + * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser} + * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. + * Creator is expected to advance the {@link Phaser} whenever the underlying + * {@link CallStreamObserver} becomes ready. + */ +@ThreadSafe +public final class DirectStreamObserver<T> implements StreamObserver<T> { + private final Phaser phaser; + private final CallStreamObserver<T> outboundObserver; + + public DirectStreamObserver( + Phaser phaser, + CallStreamObserver<T> outboundObserver) { + this.phaser = phaser; + this.outboundObserver = outboundObserver; + } + + @Override + public void onNext(T value) { + int phase = phaser.getPhase(); + if (!outboundObserver.isReady()) { + phaser.awaitAdvance(phase); + } + synchronized (outboundObserver) { + outboundObserver.onNext(value); + } + } + + @Override + public void onError(Throwable t) { + synchronized (outboundObserver) { + outboundObserver.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (outboundObserver) { + outboundObserver.onCompleted(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java new file mode 100644 index 0000000..ef641b0 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.stream; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; + +/** + * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls. + * + * <p>Used to wrap existing {@link StreamObserver}s to be able to install an + * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}. + * + * <p>This is as thread-safe as the undering stream observer that is being wrapped. + */ +final class ForwardingClientResponseObserver<ReqT, RespT> + implements ClientResponseObserver<RespT, ReqT> { + private final Runnable onReadyHandler; + private final StreamObserver<ReqT> inboundObserver; + + ForwardingClientResponseObserver( + StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) { + this.inboundObserver = inboundObserver; + this.onReadyHandler = onReadyHandler; + } + + @Override + public void onNext(ReqT value) { + inboundObserver.onNext(value); + } + + @Override + public void onError(Throwable t) { + inboundObserver.onError(t); + } + + @Override + public void onCompleted() { + inboundObserver.onCompleted(); + } + + @Override + public void beforeStart(ClientCallStreamObserver<RespT> stream) { + stream.setOnReadyHandler(onReadyHandler); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java new file mode 100644 index 0000000..9326e11 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.stream; + +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Uses {@link PipelineOptions} to configure which underlying {@link StreamObserver} implementation + * to use. + */ +public abstract class StreamObserverFactory { + public static StreamObserverFactory fromOptions(PipelineOptions options) { + List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments(); + if (experiments != null && experiments.contains("beam_fn_api_buffered_stream")) { + int bufferSize = Buffered.DEFAULT_BUFFER_SIZE; + for (String experiment : experiments) { + if (experiment.startsWith("beam_fn_api_buffered_stream_buffer_size=")) { + bufferSize = Integer.parseInt( + experiment.substring("beam_fn_api_buffered_stream_buffer_size=".length())); + } + } + return new Buffered(options.as(GcsOptions.class).getExecutorService(), bufferSize); + } + return new Direct(); + } + + public abstract <ReqT, RespT> StreamObserver<RespT> from( + Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory, + StreamObserver<ReqT> responseObserver); + + private static class Direct extends StreamObserverFactory { + + @Override + public <ReqT, RespT> StreamObserver<RespT> from( + Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory, + StreamObserver<ReqT> inboundObserver) { + AdvancingPhaser phaser = new AdvancingPhaser(1); + CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply( + new ForwardingClientResponseObserver<ReqT, RespT>( + inboundObserver, phaser::arrive)); + return new DirectStreamObserver<>(phaser, outboundObserver); + } + } + + private static class Buffered extends StreamObserverFactory { + private static final int DEFAULT_BUFFER_SIZE = 64; + private final ExecutorService executorService; + private final int bufferSize; + + private Buffered(ExecutorService executorService, int bufferSize) { + this.executorService = executorService; + this.bufferSize = bufferSize; + } + + @Override + public <ReqT, RespT> StreamObserver<RespT> from( + Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory, + StreamObserver<ReqT> inboundObserver) { + AdvancingPhaser phaser = new AdvancingPhaser(1); + CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply( + new ForwardingClientResponseObserver<ReqT, RespT>( + inboundObserver, phaser::arrive)); + return new BufferingStreamObserver<>( + phaser, outboundObserver, executorService, bufferSize); + } + + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java new file mode 100644 index 0000000..df4042c --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * gRPC stream management. + */ +package org.apache.beam.fn.harness.stream;
