[
https://issues.apache.org/jira/browse/FLINK-32160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725660#comment-17725660
]
Michał Fijołek commented on FLINK-32160:
----------------------------------------
Thanks [~luoyuxia] - it looks like the same exception. In my case parallelism
does not change though
> CompactOperator cannot continue from checkpoint because of
> java.util.NoSuchElementException
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-32160
> URL: https://issues.apache.org/jira/browse/FLINK-32160
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.16.0, 1.17.0
> Environment: Flink 1.16/1.17 on k8s (flink-kubernetes-operator
> v.1.4.0), s3
> Reporter: Michał Fijołek
> Priority: Major
>
> Hello :) We have a flink job (v 1.17) on k8s (using official
> flink-k8s-operator) that reads data from kafka and writes it to s3 using
> flink-sql using compaction. Job sometimes fails and continues from checkpoint
> just fine, but once a couple of days we experience a crash loop. Job cannot
> continue from the latest checkpoint and fails with such exception:
> {noformat}
> java.util.NoSuchElementException at
> java.base/java.util.ArrayList$Itr.next(Unknown Source)
> at
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Unknown Source){noformat}
> Here’s the relevant code:
> [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]
> It looks like `CompactOperator` is calling `next()` on iterator without
> checking `hasNext()` first - why's that? Is it a bug? Why
> `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty
> iterator? Is latest checkpoint broken in such case?
> We have an identical job, but without compaction, and it works smoothly for a
> couple of weeks now.
> The whole job is just `select` from kafka and `insert` to s3.
> {noformat}
> CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING,
> `foo_bar2` STRING,
> `foo_bar3` STRING,
> `foo_bar4` STRING
> )
> PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
> STORED AS parquet
> LOCATION 's3a://my/bucket/'
> TBLPROPERTIES (
> 'auto-compaction' = 'true',
> 'compaction.file-size' = '128MB',
> 'sink.parallelism' = '8',
> 'format' = 'parquet',
> 'parquet.compression' = 'SNAPPY',
> 'sink.rolling-policy.rollover-interval' = '1 h',
> 'sink.partition-commit.policy.kind' = 'metastore'
> ){noformat}
> Checkpoint configuration:
> {noformat}
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend HashMapStateBackend
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Disabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 0
> Checkpoints With Finished Tasks Enabled
> State Changelog Disabled{noformat}
> Is there something wrong with given config or is this some unhandled edge
> case?
> Currently our workaround is to restart a job, without using checkpoint - it
> uses a state from kafka which in this case is fine
--
This message was sent by Atlassian Jira
(v8.20.10#820010)