Hi Lei, I’m afraid there is currently no API for doing this in one operation. I see two options right now:
1. Built a custom operator that implements windowing and also has a second input for the parameter stream. This would be a subclass of TwoInputStreamOperator. As an example, you can look at KeyedCoProcessOperator which is the operator implementation for a two-input process function (CoProcessFunction). This variant gives you most flexibility but it’s a bit involved. 2. Use two separate steps, i.e. first do the windowed operation and then have a second operation that combines the window results with the parameter stream. Something like this: DataStream<T> input = …; DataStream<P> parameterStream = …; input .keyBy(…) .window(…) .reduce()/process()/apply() // the operation that you want to perform .connect(parameterStream) .process(new MyCoProcessFunction()) Where MyCoProcessFunction would receive the results of the windowed operation on input 1 and the parameter stream on input 2. The function would keep state based on the parameter stream (you should checkpoint this (see CheckpointedFunction, and especially OperatorStateStore.getUnionListState()) and process elements that come in on input 1 based on this state. Union ListState works like this: each parallel operator instance can put a bunch of things in state. When checkpointing, the state of all parallel instances is collected and checkpointed. When restoring (after failure, for example) all state is sent to each parallel operator instance. In your case (I’m assuming that the parameter stream should be broadcast so that all parallel operator instances get the same input and therefore have the same state) you would only checkpoint the state of parallel operator instance 0. When restoring, this would be distributed to all operators and they therefore all have the same state again. Does that help? Best, Aljoscha > On 30. Jun 2017, at 21:22, Lei Chen <ley...@gmail.com> wrote: > > Hi, > > In my scenario I have 2 streams. DS1 is main data stream reading logs from > kafka, and DS2 is a parameter stream which is used to maintain a state > about all processing parameters (including filters) need to be applied at > runtime by DS1. The processing parameters can be changed anytime during the > job is running. > > DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to > connect these 2 streams together so DS1 can apply those parameters in its > window function by reading up-to-date parameter state maintained by DS2? > > > thanks > Lei