[ 
https://issues.apache.org/jira/browse/FLINK-38539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18031373#comment-18031373
 ] 

Robert Metzger commented on FLINK-38539:
----------------------------------------

This is my FlinkDeployment:

{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: tumbling
spec:
  image: flink:2.1.0-java17
  flinkVersion: v2_1
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: 4
    rest.profiling.enabled: "true"
    rest.flamegraph.enabled: "true"
    metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.factory.class: 
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9250"
    metrics.reporters: prom
    web.cancel.enable: true
    kubernetes.taskmanager.cpu.limit-factor: "1"
    execution.checkpointing.interval: 60s
    state.checkpoints.dir: gs://flink/checkpoints
    high-availability.storageDir: gs://flink/ha
    high-availability.type: kubernetes
    taskmanager.memory.managed.fraction: "0.7"
    taskmanager.memory.network.min: 1mb
    taskmanager.memory.network.max: 8mb
    taskmanager.memory.jvm-metaspace.size: 150mb
    taskmanager.memory.task.off-heap.size: 1gb
    state.backend.type: hashmap
    # taskmanager.network.detailed-metrics: true
  serviceAccount: flink
  podTemplate:
    spec:
      volumes:
        - name: flink-usrlib
          emptyDir: {}
      initContainers:
        - name: jar-downloader
          image: google/cloud-sdk:latest # An image with gsutil
          command:
            - gsutil
            - cp
            - gs://flink/jars/tumbling.jar
            - /opt/flink/usrlib/job.jar
          volumeMounts:
            - name: flink-usrlib
              mountPath: /opt/flink/usrlib
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: "flink-gs-fs-hadoop-2.1.0.jar"
          volumeMounts:
            - name: flink-usrlib
              mountPath: /opt/flink/usrlib
          ports:
              - name: metrics 
                containerPort: 9250
                protocol: TCP
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "20g"
      cpu: 6
  job:
    jarURI: local:///opt/flink/usrlib/job.jar
    entryClass: de.robertmetzger.SqlExecutor
    parallelism: 4
    upgradeMode: last-state
    args:
      - --brokers
      - kafka:9092
      - --in-topic
      - data
      - --out-topic
      - results
{code}


> IndexOutOfBoundsException in AbstractSliceSyncStateWindowAggProcessor
> ---------------------------------------------------------------------
>
>                 Key: FLINK-38539
>                 URL: https://issues.apache.org/jira/browse/FLINK-38539
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.1.0
>            Reporter: Robert Metzger
>            Priority: Critical
>
> I'm running this Flink SQL query:
> {code:java}
> INSERT INTO SinkTable
> SELECT
>     `user_id`,
>     COUNT(*) AS `event_count`,
>     ROW(
>         LAST_VALUE(`timestamp`),
>         LAST_VALUE(`record_id`),
>         LAST_VALUE(`cat_id`),
>         LAST_VALUE(`group_id`),
>         LAST_VALUE(CAST(`payload` AS VARCHAR))
>     ) AS `last_record`
> FROM TABLE(
>     TUMBLE(TABLE SourceTable, DESCRIPTOR(proc_time), INTERVAL '5' MINUTES)
> )
> GROUP BY `user_id`, `window_start`, `window_end`;
> {code}
> Which always fails with 
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -65534 out of bounds for length 
> 65655
>       at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
> Source)
>       at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
>       at java.base/java.util.Objects.checkIndex(Unknown Source)
>       at java.base/java.util.ArrayList.get(Unknown Source)
>       at 
> org.apache.flink.runtime.io.disk.RandomAccessInputView.setReadPosition(RandomAccessInputView.java:65)
>       at 
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.updateValuePointer(AbstractBytesMultiMap.java:329)
>       at 
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.updateValuePointerInValueArea(AbstractBytesMultiMap.java:323)
>       at 
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.append(AbstractBytesMultiMap.java:155)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:89)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractSliceSyncStateWindowAggProcessor.processElement(AbstractSliceSyncStateWindowAggProcessor.java:123)
>       at 
> org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator.processElement(WindowAggOperator.java:219)
>       at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:64)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:245)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:206)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:163)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:980)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:942)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to