[
https://issues.apache.org/jira/browse/FLINK-14763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen updated FLINK-14763:
-------------------------------
Fix Version/s: (was: 1.10.2)
(was: 1.11.0)
1.12.0
> flink sql cep parallelism error
> ---------------------------------
>
> Key: FLINK-14763
> URL: https://issues.apache.org/jira/browse/FLINK-14763
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.10.0
> Environment: flink on yarn
> flink 1.10
> hadoop 3.0
> kafka 2.2.0
> Reporter: richt richt
> Priority: Major
> Fix For: 1.12.0
>
>
> when i commit a cep sql with sql-client use parallelism large than 1 , it
> print error as blow
> {code:java}
> //代码占位符
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0,
> endKeyGroup=15} does not contain key group
> 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0,
> endKeyGroup=15} does not contain key group 16 at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216)
> at
> org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
> at
> org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at
> java.lang.Thread.run(Thread.java:748)
> {code}
> it seems allocate some key to the wrong taskmanager
>
> the yaml is
> {code:java}
> //代码占位符
> execution:
> planner: blink
> type: streaming
> parallelism: 32
> ....
> - name: Ticker
> type: source-table
> update-mode: append
> connector:
> sink-partitioner: round-robin
> sink-partitioner-class:
> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
> property-version: 1
> type: kafka
> version: universal
> topic: test_part
> startup-mode: earliest-offset
> properties:
> - key: bootstrap.servers
> value: localhost:9092
> - key: group.id
> value: testGroup
> format:
> property-version: 1
> type: json
> derive-schema: true
> schema:
> - name: symbol
> type: VARCHAR
> - name: rtime
> type: TIMESTAMP
> rowtime:
> timestamps:
> type: from-field
> from: rowtime
> watermarks:
> type: periodic-bounded
> delay: 2000
> - name: price
> type: BIGINT
> - name: tax
> type: BIGINT
> {code}
> and the query is from the demo:
> {code:java}
> SELECT *
> FROM Ticker
> MATCH_RECOGNIZE(
> PARTITION BY symbol
> ORDER BY rtime
> MEASURES
> C.price AS lastPrice
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B* C)
> DEFINE
> A AS A.price > 10,
> B AS B.price < 15,
> C AS C.price > 12
> )
> {code}
> the data :
> {code:java}
> symbol rtime price
> tax
> ACME 2011-11-11T10:00
> 12 1
> ACME 2011-11-11T10:00:02
> 19 1
> ACME 2011-11-11T10:00:01
> 17 2
> ACME 2011-11-11T10:00:03
> 21 3
> ACME 2011-11-11T10:00:04
> 25 2
> ACME 2011-11-11T10:00:05
> 18 1
> ACME 2011-11-11T10:00:06
> 15 1
> ACME 2011-11-11T10:00:07
> 14 2
> ACME 2011-11-11T10:00:08
> 24 2
> ACME 2011-11-11T10:00:09
> 25 2
> ACME 2011-11-11T10:00:10
> 19 1
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)