[
https://issues.apache.org/jira/browse/QUARKS-91?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Will Marshall updated QUARKS-91:
--------------------------------
Description:
Quarks should support the ability to join a stream against another stream or
window. The interface would look something like this:
{code}
<J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window,
BiFunction<T, List<U>, J> joiner);
<J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J>
joiner);
<J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream,
Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
{code}
For *join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>, J>
joiner)*:
Join this stream with a partitioned window of type U with key type K. For each
tuple on this stream, it is joined with the contents of window for the key
keyer.apply(tuple). Each tuple is passed into joiner and the return value is
submitted to the returned stream. If call returns null then no tuple is
submitted.
For *joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K>
lastStreamKeyer, BiFunction<T, U, J> joiner)*:
Join this stream with the last tuple seen on a stream of type U with
partitioning. For each tuple on this stream, it is joined with the last tuple
seen on lastStream with a matching key (of type K). Each tuple t on this stream
will match the last tuple u on lastStream if
keyer.apply(t).equals(lastStreamKeyer.apply(u)) is true. The assumption is made
that the key classes correctly implement the contract for equals and
hashCode(). Each tuple is passed into joiner and the return value is submitted
to the returned stream. If call returns null then no tuple is submitted.
For *joinLast(TStream<U> lastStream, BiFunction<T, U, J> joiner)*:
Join this stream with the last tuple seen on a stream of type U. For each tuple
on this stream, it is joined with the last tuple seen on lastStream. Each tuple
is passed into joiner and the return value is submitted to the returned stream.
If call returns null then no tuple is submitted.
This is a simplified version of join(TWindow, BiFunction) where instead the
window contents are passed as a single tuple of type U rather than a list
containing one tuple. If no tuple has been seen on lastStream then null will be
passed as the second argument to joiner.
*Issues:*
- The definition of Oplet<I, O> requires that all input tuples be of the same
type. If streamA and streamB are of different types, it's not clear how to
create a "join" oplet where one input port has the tuples of streamA and
another has the tuples of streamB.
- The windowing library doesn't currently support accessing the contents of a
partition from a quarks.window.Window reference.
was:
Quarks should support the ability to join a stream against another stream or
window. The interface would look something like this:
{code}
<J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window,
BiFunction<T, List<U>, J> joiner);
<J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J>
joiner);
<J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream,
BiFunction<T, U, J> joiner);
{code}
For a stream and a window, streamA and windowB: *streamA.join(keyer, windowB,
joiner)* would call *joiner* on every tuple on streamA with the contents of the
partition associated with the key *keyer.apply(tuple)*.
For two streams, streamA and streamB: *streamA.joinLast(keyer, streamB,
joiner)* would call the supplied *joiner* function on the most recent value of
streamA and the most recent value of streamB whose key matched
*keyer.apply(tuple)*. This happens whenever a tuple passes through streamA.
For two streams, streamA and streamB: *streamA.joinLast(streamB, joiner)* would
call the supplied *joiner* function on the most recent values of streamA and
streamB whenever a tuple passes through streamA.
*Issues:*
- The definition of Oplet<I, O> requires that all input tuples be of the same
type. If streamA and streamB are of different types, it's not clear how to
create a "join" oplet where one input port has the tuples of streamA and
another has the tuples of streamB.
- The windowing library doesn't currently support accessing the contents of a
partition from a quarks.window.Window reference.
> Support joining a stream against another stream/window
> ------------------------------------------------------
>
> Key: QUARKS-91
> URL: https://issues.apache.org/jira/browse/QUARKS-91
> Project: Quarks
> Issue Type: Improvement
> Reporter: Will Marshall
> Assignee: Will Marshall
>
> Quarks should support the ability to join a stream against another stream or
> window. The interface would look something like this:
> {code}
> <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window,
> BiFunction<T, List<U>, J> joiner);
>
> <J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J>
> joiner);
> <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U>
> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
> {code}
> For *join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>,
> J> joiner)*:
> Join this stream with a partitioned window of type U with key type K. For
> each tuple on this stream, it is joined with the contents of window for the
> key keyer.apply(tuple). Each tuple is passed into joiner and the return value
> is submitted to the returned stream. If call returns null then no tuple is
> submitted.
> For *joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K>
> lastStreamKeyer, BiFunction<T, U, J> joiner)*:
> Join this stream with the last tuple seen on a stream of type U with
> partitioning. For each tuple on this stream, it is joined with the last tuple
> seen on lastStream with a matching key (of type K). Each tuple t on this
> stream will match the last tuple u on lastStream if
> keyer.apply(t).equals(lastStreamKeyer.apply(u)) is true. The assumption is
> made that the key classes correctly implement the contract for equals and
> hashCode(). Each tuple is passed into joiner and the return value is
> submitted to the returned stream. If call returns null then no tuple is
> submitted.
> For *joinLast(TStream<U> lastStream, BiFunction<T, U, J> joiner)*:
> Join this stream with the last tuple seen on a stream of type U. For each
> tuple on this stream, it is joined with the last tuple seen on lastStream.
> Each tuple is passed into joiner and the return value is submitted to the
> returned stream. If call returns null then no tuple is submitted.
> This is a simplified version of join(TWindow, BiFunction) where instead the
> window contents are passed as a single tuple of type U rather than a list
> containing one tuple. If no tuple has been seen on lastStream then null will
> be passed as the second argument to joiner.
> *Issues:*
> - The definition of Oplet<I, O> requires that all input tuples be of the same
> type. If streamA and streamB are of different types, it's not clear how to
> create a "join" oplet where one input port has the tuples of streamA and
> another has the tuples of streamB.
> - The windowing library doesn't currently support accessing the contents of a
> partition from a quarks.window.Window reference.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)