[ https://issues.apache.org/jira/browse/FLINK-7424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Benedict Jin updated FLINK-7424: -------------------------------- Description: `CEP` component make `KeyedStream` choose wrong channel Origin KeySelector is perfect right. {code:java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { return (KeySelector<HBaseServerLog, Integer>) log -> { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return flumeId; }; } {code} After some changes, it will throw Key group index out of range of key group range [16, 32) exception. {code.java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) { return new KeySelector<HBaseServerLog, Integer>() { private Random r = new Random(System.nanoTime()); @Override public Integer getKey(HBaseServerLog log) throws Exception { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0); } }; } {code} But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the KeySelector choose wrong channel and throw IllegalArgumentException. was: `CEP` component make `KeyedStream` choose wrong channel Origin KeySelector is perfect right. {code:java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { return (KeySelector<HBaseServerLog, Integer>) log -> { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return flumeId; }; } {code} After some changes, it will throw Key group index out of range of key group range [16, 32) exception. {code.java} public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) { return new KeySelector<HBaseServerLog, Integer>() { private Random r = new Random(System.nanoTime()); @Override public Integer getKey(HBaseServerLog log) throws Exception { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0); } }; } {code} But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the {code.java}KeySelector{code} choose wrong channel and throw IllegalArgumentException. > `CEP` component make `KeyedStream` choose wrong channel > ------------------------------------------------------- > > Key: FLINK-7424 > URL: https://issues.apache.org/jira/browse/FLINK-7424 > Project: Flink > Issue Type: Bug > Components: CEP, Streaming > Reporter: Benedict Jin > Assignee: Benedict Jin > > `CEP` component make `KeyedStream` choose wrong channel > Origin KeySelector is perfect right. > {code:java} > public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { > return (KeySelector<HBaseServerLog, Integer>) log -> { > if (log == null) return 0; > Integer flumeId; > if ((flumeId = log.getFlumeId()) == null) return 1; > return flumeId; > }; > } > {code} > After some changes, it will throw Key group index out of range of key group > range [16, 32) exception. > {code.java} > public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int > parallelism) { > return new KeySelector<HBaseServerLog, Integer>() { > private Random r = new Random(System.nanoTime()); > @Override > public Integer getKey(HBaseServerLog log) throws Exception { > if (log == null) return 0; > Integer flumeId; > if ((flumeId = log.getFlumeId()) == null) return 1; > return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * > r.nextInt(parallelism)), 0); > } > }; > } > {code} > But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it > shouldn't be wrong. Actually, when we add some `CEP` component > (IterativeCondition/PatternFlatSelectFunction) code after it. It make the > KeySelector choose wrong channel and throw IllegalArgumentException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)