[ 
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709090#comment-15709090
 ] 

Stephan Ewen commented on FLINK-4855:
-------------------------------------

I think this is a good idea, I have heard this being asked a couple of times.

[~greghogan] It could be discussed on the mailing list, true. In this case, it 
is a very small addition, so discussing it in this issue would be probably okay.
Probably the biggest remaining question is how to call the new function.

My suggestion for the name would be `reKeyBy()`, because it expresses that this 
should be used on streams that were keyed before.

I recently wrote a utility function for a user. The utility function does 
re-keying for window functions, here is the code, usable as a utility function 
as a temporary way to realize that.
{code}
public static <T, K, W extends Window, R> SingleOutputStreamOperator<R> 
reKeyAndWindow(
                DataStream<T> input,
                KeySelector<T, K> keySelector,
                WindowAssigner<? super T, W> windowAssigner,
                WindowFunction<T, R, K, W> function) {

        Trigger<? super T, ? super W> trigger = 
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());

        return reKeyAndWindow(input, keySelector, windowAssigner, trigger, 
function);
}

public static <T, K, W extends Window, R> SingleOutputStreamOperator<R> 
reKeyAndWindow(
                DataStream<T> input,
                KeySelector<T, K> keySelector,
                WindowAssigner<? super T, W> windowAssigner,
                Trigger<? super T, ? super W> trigger,
                WindowFunction<T, R, K, W> function) {

        TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
                        function, WindowFunction.class, true, true, 
input.getType(), null, false);

        TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType());

        return reKeyAndWindow(input, keySelector, windowAssigner, trigger, 
function, resultType, keyType);
}

public static <T, K, W extends Window, R> SingleOutputStreamOperator<R> 
reKeyAndWindow(
                DataStream<T> input,
                KeySelector<T, K> keySelector,
                WindowAssigner<? super T, W> windowAssigner,
                Trigger<? super T, ? super W> trigger,
                WindowFunction<T, R, K, W> function,
                TypeInformation<R> resultType,
                TypeInformation<K> keyType) {

        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        String callLocation = Utils.getCallLocationName();
        String udfName = "WindowedStream." + callLocation;

        ListStateDescriptor<T> stateDesc = new 
ListStateDescriptor<>("window-contents",
                        input.getType().createSerializer(env.getConfig()));

        String opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + 
", " + trigger + ", " + udfName + ")";

        WindowOperator<K, T, Iterable<T>, R, W> operator =
                        new WindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(env.getConfig()),
                                        keySelector,
                                        
keyType.createSerializer(env.getConfig()),
                                        stateDesc,
                                        new 
InternalIterableWindowFunction<>(function),
                                        trigger,
                                        0L); // last parameter is the allowed 
lateness

        SingleOutputStreamOperator<R> result = input.transform(opName, 
resultType, operator);

        OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) 
result.getTransformation();
        transform.setStateKeySelector(keySelector);
        transform.setStateKeyType(keyType);

        return result;
}

// -------------- example usage ---------------

public static void main(String[] args) throws Exception {

        final KeySelector<Tuple2<Long, Long>, Long> keySelector = (value) -> 
value.f0;

        DataStream<Tuple2<Long, Long>> stream = env
                        .generateSequence(1, 100000000000L)
                        .map( (value) -> new Tuple2<>(value / 10000, 1L) );

        DataStream<Tuple2<Long, Long>> perSecondWindows = stream
                        .keyBy(keySelector)
                        .timeWindow(Time.seconds(1))
                        .sum(1);

        DataStream<Tuple2<Long, Long>> perFiveSecondWindows = reKeyAndWindow(
                        perSecondWindows,
                        keySelector,
                        TumblingEventTimeWindows.of(Time.seconds(5)),
                        new WindowFunction<Tuple2<Long,Long>, Tuple2<Long, 
Long>, Long, TimeWindow>() { ... }
                        );
}
{code}


> Add partitionedKeyBy to DataStream
> ----------------------------------
>
>                 Key: FLINK-4855
>                 URL: https://issues.apache.org/jira/browse/FLINK-4855
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Xiaowei Jiang
>            Assignee: MaGuowei
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> After we do any interesting operations (e.g. reduce) on KeyedStream, the 
> result becomes DataStream. In a lot of cases, the output still has the same 
> or compatible keys with the KeyedStream (logically). But to do further 
> operations on these keys, we are forced to use keyby again. This works 
> semantically, but is costly in two aspects. First, it destroys the 
> possibility of chaining, which is one of the most important optimization 
> technique. Second, keyby will greatly expand the connected components of 
> tasks, which has implications in failover optimization.
> To address this shortcoming, we propose a new operator partitionedKeyBy.
> DataStream {
>     public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid 
> as an extra field. This guarantees that records from different tasks will 
> never produce the same keys.
> With this, it's possible to do
> ds.keyBy(key1).reduce(func1)
>     .partitionedKeyBy(key1).reduce(func2)
>     .partitionedKeyBy(key2).reduce(func3);
> Most importantly, in certain cases, we will be able to chains these into a 
> single vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to