[ 
https://issues.apache.org/jira/browse/FLINK-7424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benedict Jin updated FLINK-7424:
--------------------------------
    Description: 
`CEP` component make `KeyedStream` choose wrong channel

Origin KeySelector is perfect right.
{code:java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
    return (KeySelector<HBaseServerLog, Integer>) log -> {
        if (log == null) return 0;
        Integer flumeId;
        if ((flumeId = log.getFlumeId()) == null) return 1;
        return flumeId;
    };
}
{code}

After some changes, it will throw Key group index out of range of key group 
range [16, 32) exception.
{code.java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int 
parallelism) {
    return new KeySelector<HBaseServerLog, Integer>() {
        private Random r = new Random(System.nanoTime());
        @Override
        public Integer getKey(HBaseServerLog log) throws Exception {
            if (log == null) return 0;
            Integer flumeId;
            if ((flumeId = log.getFlumeId()) == null) return 1;
            return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * 
r.nextInt(parallelism)), 0);
        }
    };
}
{code}

But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't 
be wrong. Actually, when we add some `CEP` component 
(IterativeCondition/PatternFlatSelectFunction) code after it. It make the 
KeySelector  choose wrong channel and throw IllegalArgumentException.

  was:
`CEP` component make `KeyedStream` choose wrong channel

Origin KeySelector is perfect right.
{code:java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
    return (KeySelector<HBaseServerLog, Integer>) log -> {
        if (log == null) return 0;
        Integer flumeId;
        if ((flumeId = log.getFlumeId()) == null) return 1;
        return flumeId;
    };
}
{code}

After some changes, it will throw Key group index out of range of key group 
range [16, 32) exception.
{code.java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int 
parallelism) {
    return new KeySelector<HBaseServerLog, Integer>() {
        private Random r = new Random(System.nanoTime());
        @Override
        public Integer getKey(HBaseServerLog log) throws Exception {
            if (log == null) return 0;
            Integer flumeId;
            if ((flumeId = log.getFlumeId()) == null) return 1;
            return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * 
r.nextInt(parallelism)), 0);
        }
    };
}
{code}

But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't 
be wrong. Actually, when we add some `CEP` component 
(IterativeCondition/PatternFlatSelectFunction) code after it. It make the 
{code.java}KeySelector{code}  choose wrong channel and throw 
IllegalArgumentException.


> `CEP` component make `KeyedStream` choose wrong channel
> -------------------------------------------------------
>
>                 Key: FLINK-7424
>                 URL: https://issues.apache.org/jira/browse/FLINK-7424
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP, Streaming
>            Reporter: Benedict Jin
>            Assignee: Benedict Jin
>
> `CEP` component make `KeyedStream` choose wrong channel
> Origin KeySelector is perfect right.
> {code:java}
> public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
>     return (KeySelector<HBaseServerLog, Integer>) log -> {
>         if (log == null) return 0;
>         Integer flumeId;
>         if ((flumeId = log.getFlumeId()) == null) return 1;
>         return flumeId;
>     };
> }
> {code}
> After some changes, it will throw Key group index out of range of key group 
> range [16, 32) exception.
> {code.java}
> public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int 
> parallelism) {
>     return new KeySelector<HBaseServerLog, Integer>() {
>         private Random r = new Random(System.nanoTime());
>         @Override
>         public Integer getKey(HBaseServerLog log) throws Exception {
>             if (log == null) return 0;
>             Integer flumeId;
>             if ((flumeId = log.getFlumeId()) == null) return 1;
>             return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * 
> r.nextInt(parallelism)), 0);
>         }
>     };
> }
> {code}
> But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it 
> shouldn't be wrong. Actually, when we add some `CEP` component 
> (IterativeCondition/PatternFlatSelectFunction) code after it. It make the 
> KeySelector  choose wrong channel and throw IllegalArgumentException.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to