[
https://issues.apache.org/jira/browse/QUARKS-166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15277311#comment-15277311
]
ASF GitHub Bot commented on QUARKS-166:
---------------------------------------
GitHub user Cazen reopened a pull request:
https://github.com/apache/incubator-quarks/pull/109
[QUARKS-166] Add Gate plumbing
Hmm The sample code in QUARKS-166 looks clearly. I just added test case.
Any advice would be appreciated :)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/Cazen/incubator-quarks QUARKS-166
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-quarks/pull/109.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #109
----
commit b41664db2b561e101b16211a063e74f7671475c2
Author: cazen.lee <[email protected]>
Date: 2016-05-08T05:47:32Z
Add Gate plumbing
commit f38780ce2bedb6296cf8643be169c43f0b3a5d25
Author: cazen.lee <[email protected]>
Date: 2016-05-08T05:48:56Z
Add testcase
commit 8389ad73cbea27670e4e9ade96a56ed24236decb
Author: Cazen Lee <[email protected]>
Date: 2016-05-09T14:29:34Z
remove condition check for timing variances failure
----
> Add Gate plumbing
> -----------------
>
> Key: QUARKS-166
> URL: https://issues.apache.org/jira/browse/QUARKS-166
> Project: Quarks
> Issue Type: New Feature
> Reporter: Dale LaBossiere
> Assignee: Cazen Lee
>
> As part of an initial experimental implementation of QUARKS-156 concurrent
> analytics / barrier, I had a need for a stream Gate mechanism - a way to
> control the release of tuples into an output stream. It's not used now.
> If there's a +1 sentiment for adding this to PlumbingStreams here's the code:
> /**
> * Control the flow of tuples to an output stream.
> * <P>
> * A {@link Semaphore} is used to control the flow of tuples
> * through the {@code gate}. The gate acquires a permit from the
> * semaphore to pass the tuple through, blocking until a permit is
> * acquired (and applying backpressure upstream while blocked).
> * Elsewhere, some code calls {@link Semaphore#release(int)}
> * to make permits available.
> * </P><P>
> * If a TopologyProvider is used that can distribute a topology's
> * streams to different JVM's the gate and the code releasing the
> * permits must be in the same JVM.
> * </P><P>
> * Sample use:
> * <BR>
> * Suppose you wanted to control processing such that concurrent
> * pipelines processed each tuple in lock-step.
> * I.e., You want all of the pipelines to start processing a tuple
> * at the same time and not start a new tuple until the current
> * tuple had been fully processed by each of them:
> * <pre>{@code
> * TStream<Integer> readings = ...;
> *
> * Semaphore gateControl = new Semaphore(1); // allow the first to pass
> through
> * TStream<Integer> gated = gate(readings, gateControl);
> *
> * // Create the concurrent pipeline combiner and have it
> * // signal that concurrent processing of the tuple has completed.
> * // In this sample the combiner just returns the received list of
> * // each pipeline result.
> * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
> * stream -> stream.map(
> * list -> {
> * gateControl.release();
> * return list;
> * });
> * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated,
> pipelines, combiner);
> * }</pre>
> * </P>
> * @param stream the input stream
> * @param semaphore gate control
> * @return gated stream
> */
> public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore)
> {
> return stream.map(tuple -> {
> try {
> semaphore.acquire();
> return tuple;
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> throw new RuntimeException("interrupted", e);
> }});
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)