[ 
https://issues.apache.org/jira/browse/QUARKS-91?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Will Marshall updated QUARKS-91:
--------------------------------
    Description: 
Quarks should support the ability to join a stream against another stream or 
window. The interface would look something like this:
{code}
    <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window, 
BiFunction<T, List<U>, J> joiner);
    
    <J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J> 
joiner);

    <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream, 
Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
{code}

For *join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>, J> 
joiner)*:
Join this stream with a partitioned window of type U with key type K. For each 
tuple on this stream, it is joined with the contents of window for the key 
keyer.apply(tuple). Each tuple is passed into joiner and the return value is 
submitted to the returned stream. If call returns null then no tuple is 
submitted.

For  *joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K> 
lastStreamKeyer, BiFunction<T, U, J> joiner)*:
Join this stream with the last tuple seen on a stream of type U with 
partitioning. For each tuple on this stream, it is joined with the last tuple 
seen on lastStream with a matching key (of type K). Each tuple t on this stream 
will match the last tuple u on lastStream if 
keyer.apply(t).equals(lastStreamKeyer.apply(u)) is true. The assumption is made 
that the key classes correctly implement the contract for equals and 
hashCode(). Each tuple is passed into joiner and the return value is submitted 
to the returned stream. If call returns null then no tuple is submitted. 

For *joinLast(TStream<U> lastStream, BiFunction<T, U, J> joiner)*:
Join this stream with the last tuple seen on a stream of type U. For each tuple 
on this stream, it is joined with the last tuple seen on lastStream. Each tuple 
is passed into joiner and the return value is submitted to the returned stream. 
If call returns null then no tuple is submitted.
This is a simplified version of join(TWindow, BiFunction) where instead the 
window contents are passed as a single tuple of type U rather than a list 
containing one tuple. If no tuple has been seen on lastStream then null will be 
passed as the second argument to joiner.

*Issues:*
- The definition of Oplet<I, O> requires that all input tuples be of the same 
type. If streamA and streamB are of different types, it's not clear how to 
create a "join" oplet where one input port has the tuples of streamA and 
another has the tuples of streamB.
- The windowing library doesn't currently support accessing the contents of a 
partition from a quarks.window.Window reference.

  was:
Quarks should support the ability to join a stream against another stream or 
window. The interface would look something like this:
{code}
    <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window, 
BiFunction<T, List<U>, J> joiner);
    
    <J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J> 
joiner);

    <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream, 
BiFunction<T, U, J> joiner);
{code}

For a stream and a window, streamA and windowB: *streamA.join(keyer, windowB, 
joiner)* would call *joiner* on every tuple on streamA with the contents of the 
partition associated with the key *keyer.apply(tuple)*.

For two streams, streamA and streamB: *streamA.joinLast(keyer, streamB, 
joiner)* would call the supplied *joiner* function on the most recent value of 
streamA and the most recent value of streamB whose key matched 
*keyer.apply(tuple)*. This happens whenever a tuple passes through streamA.

For two streams, streamA and streamB: *streamA.joinLast(streamB, joiner)* would 
call the supplied *joiner* function on the most recent values of streamA and 
streamB whenever a tuple passes through streamA.

*Issues:*
- The definition of Oplet<I, O> requires that all input tuples be of the same 
type. If streamA and streamB are of different types, it's not clear how to 
create a "join" oplet where one input port has the tuples of streamA and 
another has the tuples of streamB.
- The windowing library doesn't currently support accessing the contents of a 
partition from a quarks.window.Window reference.


> Support joining a stream against another stream/window
> ------------------------------------------------------
>
>                 Key: QUARKS-91
>                 URL: https://issues.apache.org/jira/browse/QUARKS-91
>             Project: Quarks
>          Issue Type: Improvement
>            Reporter: Will Marshall
>            Assignee: Will Marshall
>
> Quarks should support the ability to join a stream against another stream or 
> window. The interface would look something like this:
> {code}
>     <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window, 
> BiFunction<T, List<U>, J> joiner);
>     
>     <J, U, K> TStream<J> joinLast(TStream<U> lastStream, BiFunction<T, U, J> 
> joiner);
>     <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> 
> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
> {code}
> For *join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>, 
> J> joiner)*:
> Join this stream with a partitioned window of type U with key type K. For 
> each tuple on this stream, it is joined with the contents of window for the 
> key keyer.apply(tuple). Each tuple is passed into joiner and the return value 
> is submitted to the returned stream. If call returns null then no tuple is 
> submitted.
> For  *joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K> 
> lastStreamKeyer, BiFunction<T, U, J> joiner)*:
> Join this stream with the last tuple seen on a stream of type U with 
> partitioning. For each tuple on this stream, it is joined with the last tuple 
> seen on lastStream with a matching key (of type K). Each tuple t on this 
> stream will match the last tuple u on lastStream if 
> keyer.apply(t).equals(lastStreamKeyer.apply(u)) is true. The assumption is 
> made that the key classes correctly implement the contract for equals and 
> hashCode(). Each tuple is passed into joiner and the return value is 
> submitted to the returned stream. If call returns null then no tuple is 
> submitted. 
> For *joinLast(TStream<U> lastStream, BiFunction<T, U, J> joiner)*:
> Join this stream with the last tuple seen on a stream of type U. For each 
> tuple on this stream, it is joined with the last tuple seen on lastStream. 
> Each tuple is passed into joiner and the return value is submitted to the 
> returned stream. If call returns null then no tuple is submitted.
> This is a simplified version of join(TWindow, BiFunction) where instead the 
> window contents are passed as a single tuple of type U rather than a list 
> containing one tuple. If no tuple has been seen on lastStream then null will 
> be passed as the second argument to joiner.
> *Issues:*
> - The definition of Oplet<I, O> requires that all input tuples be of the same 
> type. If streamA and streamB are of different types, it's not clear how to 
> create a "join" oplet where one input port has the tuples of streamA and 
> another has the tuples of streamB.
> - The windowing library doesn't currently support accessing the contents of a 
> partition from a quarks.window.Window reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to