[
https://issues.apache.org/jira/browse/QUARKS-166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276399#comment-15276399
]
Dale LaBossiere commented on QUARKS-166:
----------------------------------------
thx for creating the test cases.
> Add Gate plumbing
> -----------------
>
> Key: QUARKS-166
> URL: https://issues.apache.org/jira/browse/QUARKS-166
> Project: Quarks
> Issue Type: New Feature
> Reporter: Dale LaBossiere
> Assignee: Cazen Lee
>
> As part of an initial experimental implementation of QUARKS-156 concurrent
> analytics / barrier, I had a need for a stream Gate mechanism - a way to
> control the release of tuples into an output stream. It's not used now.
> If there's a +1 sentiment for adding this to PlumbingStreams here's the code:
> /**
> * Control the flow of tuples to an output stream.
> * <P>
> * A {@link Semaphore} is used to control the flow of tuples
> * through the {@code gate}. The gate acquires a permit from the
> * semaphore to pass the tuple through, blocking until a permit is
> * acquired (and applying backpressure upstream while blocked).
> * Elsewhere, some code calls {@link Semaphore#release(int)}
> * to make permits available.
> * </P><P>
> * If a TopologyProvider is used that can distribute a topology's
> * streams to different JVM's the gate and the code releasing the
> * permits must be in the same JVM.
> * </P><P>
> * Sample use:
> * <BR>
> * Suppose you wanted to control processing such that concurrent
> * pipelines processed each tuple in lock-step.
> * I.e., You want all of the pipelines to start processing a tuple
> * at the same time and not start a new tuple until the current
> * tuple had been fully processed by each of them:
> * <pre>{@code
> * TStream<Integer> readings = ...;
> *
> * Semaphore gateControl = new Semaphore(1); // allow the first to pass
> through
> * TStream<Integer> gated = gate(readings, gateControl);
> *
> * // Create the concurrent pipeline combiner and have it
> * // signal that concurrent processing of the tuple has completed.
> * // In this sample the combiner just returns the received list of
> * // each pipeline result.
> * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
> * stream -> stream.map(
> * list -> {
> * gateControl.release();
> * return list;
> * });
> * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated,
> pipelines, combiner);
> * }</pre>
> * </P>
> * @param stream the input stream
> * @param semaphore gate control
> * @return gated stream
> */
> public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore)
> {
> return stream.map(tuple -> {
> try {
> semaphore.acquire();
> return tuple;
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> throw new RuntimeException("interrupted", e);
> }});
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)