Hi Lei,

I’m afraid there is currently no API for doing this in one operation. I see two 
options right now:

1. Built a custom operator that implements windowing and also has a second 
input for the parameter stream. This would be a subclass of 
TwoInputStreamOperator. As an example, you can look at KeyedCoProcessOperator 
which is the operator implementation for a two-input process function 
(CoProcessFunction). This variant gives you most flexibility but it’s a bit 
involved.

2. Use two separate steps, i.e. first do the windowed operation and then have a 
second operation that combines the window results with the parameter stream. 
Something like this:

DataStream<T> input = …;
DataStream<P> parameterStream = …;
input
  .keyBy(…)
  .window(…)
  .reduce()/process()/apply() // the operation that you want to perform
  .connect(parameterStream)
  .process(new MyCoProcessFunction())

Where MyCoProcessFunction would receive the results of the windowed operation 
on input 1 and the parameter stream on input 2. The function would keep state 
based on the parameter stream (you should checkpoint this (see 
CheckpointedFunction, and especially OperatorStateStore.getUnionListState()) 
and process elements that come in on input 1 based on this state.

Union ListState works like this: each parallel operator instance can put a 
bunch of things in state. When checkpointing, the state of all parallel 
instances is collected and checkpointed. When restoring (after failure, for 
example) all state is sent to each parallel operator instance. In your case 
(I’m assuming that the parameter stream should be broadcast so that all 
parallel operator instances get the same input and therefore have the same 
state) you would only checkpoint the state of parallel operator instance 0. 
When restoring, this would be distributed to all operators and they therefore 
all have the same state again.

Does that help?

Best,
Aljoscha

> On 30. Jun 2017, at 21:22, Lei Chen <ley...@gmail.com> wrote:
> 
> Hi,
> 
> In my scenario I have 2 streams. DS1 is main data stream reading logs from
> kafka, and DS2 is a parameter stream which is used to maintain a state
> about all processing parameters (including filters) need to be applied at
> runtime by DS1. The processing parameters can be changed anytime during the
> job is running.
> 
> DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to
> connect these 2 streams together so DS1 can apply those parameters in its
> window function by reading up-to-date parameter state maintained by DS2?
> 
> 
> thanks
> Lei

Reply via email to