[
https://issues.apache.org/jira/browse/FLINK-38539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18031373#comment-18031373
]
Robert Metzger edited comment on FLINK-38539 at 10/21/25 9:05 AM:
------------------------------------------------------------------
This is my FlinkDeployment:
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: tumbling
spec:
image: flink:2.1.0-java17
flinkVersion: v2_1
flinkConfiguration:
taskmanager.numberOfTaskSlots: 4
rest.profiling.enabled: "true"
rest.flamegraph.enabled: "true"
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9250"
metrics.reporters: prom
web.cancel.enable: true
kubernetes.taskmanager.cpu.limit-factor: "1"
execution.checkpointing.interval: 60s
state.checkpoints.dir: gs://flink/checkpoints
high-availability.storageDir: gs://flink/ha
high-availability.type: kubernetes
taskmanager.memory.managed.fraction: "0.7"
taskmanager.memory.network.min: 1mb
taskmanager.memory.network.max: 8mb
taskmanager.memory.jvm-metaspace.size: 150mb
taskmanager.memory.task.off-heap.size: 1gb
state.backend.type: hashmap
# taskmanager.network.detailed-metrics: true
serviceAccount: flink
podTemplate:
spec:
volumes:
- name: flink-usrlib
emptyDir: {}
initContainers:
- name: jar-downloader
image: google/cloud-sdk:latest # An image with gsutil
command:
- gsutil
- cp
- gs://flink/jars/tumbling.jar
- /opt/flink/usrlib/job.jar
volumeMounts:
- name: flink-usrlib
mountPath: /opt/flink/usrlib
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: "flink-gs-fs-hadoop-2.1.0.jar"
volumeMounts:
- name: flink-usrlib
mountPath: /opt/flink/usrlib
ports:
- name: metrics
containerPort: 9250
protocol: TCP
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "20g"
cpu: 6
job:
jarURI: local:///opt/flink/usrlib/job.jar
entryClass: de.robertmetzger.SqlExecutor
parallelism: 4
upgradeMode: last-state
args:
- --brokers
- kafka:9092
- --in-topic
- data
- --out-topic
- results
{code}
The error also happens with Flink 1.20.3
was (Author: rmetzger):
This is my FlinkDeployment:
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: tumbling
spec:
image: flink:2.1.0-java17
flinkVersion: v2_1
flinkConfiguration:
taskmanager.numberOfTaskSlots: 4
rest.profiling.enabled: "true"
rest.flamegraph.enabled: "true"
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9250"
metrics.reporters: prom
web.cancel.enable: true
kubernetes.taskmanager.cpu.limit-factor: "1"
execution.checkpointing.interval: 60s
state.checkpoints.dir: gs://flink/checkpoints
high-availability.storageDir: gs://flink/ha
high-availability.type: kubernetes
taskmanager.memory.managed.fraction: "0.7"
taskmanager.memory.network.min: 1mb
taskmanager.memory.network.max: 8mb
taskmanager.memory.jvm-metaspace.size: 150mb
taskmanager.memory.task.off-heap.size: 1gb
state.backend.type: hashmap
# taskmanager.network.detailed-metrics: true
serviceAccount: flink
podTemplate:
spec:
volumes:
- name: flink-usrlib
emptyDir: {}
initContainers:
- name: jar-downloader
image: google/cloud-sdk:latest # An image with gsutil
command:
- gsutil
- cp
- gs://flink/jars/tumbling.jar
- /opt/flink/usrlib/job.jar
volumeMounts:
- name: flink-usrlib
mountPath: /opt/flink/usrlib
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: "flink-gs-fs-hadoop-2.1.0.jar"
volumeMounts:
- name: flink-usrlib
mountPath: /opt/flink/usrlib
ports:
- name: metrics
containerPort: 9250
protocol: TCP
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "20g"
cpu: 6
job:
jarURI: local:///opt/flink/usrlib/job.jar
entryClass: de.robertmetzger.SqlExecutor
parallelism: 4
upgradeMode: last-state
args:
- --brokers
- kafka:9092
- --in-topic
- data
- --out-topic
- results
{code}
> IndexOutOfBoundsException in AbstractSliceSyncStateWindowAggProcessor /
> AbstractBytesMultiMap
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-38539
> URL: https://issues.apache.org/jira/browse/FLINK-38539
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.1.0, 1.20.3
> Reporter: Robert Metzger
> Priority: Critical
>
> I'm running this Flink SQL query:
> {code:java}
> INSERT INTO SinkTable
> SELECT
> `user_id`,
> COUNT(*) AS `event_count`,
> ROW(
> LAST_VALUE(`timestamp`),
> LAST_VALUE(`record_id`),
> LAST_VALUE(`cat_id`),
> LAST_VALUE(`group_id`),
> LAST_VALUE(CAST(`payload` AS VARCHAR))
> ) AS `last_record`
> FROM TABLE(
> TUMBLE(TABLE SourceTable, DESCRIPTOR(proc_time), INTERVAL '5' MINUTES)
> )
> GROUP BY `user_id`, `window_start`, `window_end`;
> {code}
> Which always fails with
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -65534 out of bounds for length
> 65655
> at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown
> Source)
> at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
> at java.base/java.util.Objects.checkIndex(Unknown Source)
> at java.base/java.util.ArrayList.get(Unknown Source)
> at
> org.apache.flink.runtime.io.disk.RandomAccessInputView.setReadPosition(RandomAccessInputView.java:65)
> at
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.updateValuePointer(AbstractBytesMultiMap.java:329)
> at
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.updateValuePointerInValueArea(AbstractBytesMultiMap.java:323)
> at
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.append(AbstractBytesMultiMap.java:155)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:89)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractSliceSyncStateWindowAggProcessor.processElement(AbstractSliceSyncStateWindowAggProcessor.java:123)
> at
> org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator.processElement(WindowAggOperator.java:219)
> at
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:64)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:245)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:980)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> at java.base/java.lang.Thread.run(Unknown Source)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)