[ 
https://issues.apache.org/jira/browse/FLINK-32160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725660#comment-17725660
 ] 

Michał Fijołek commented on FLINK-32160:
----------------------------------------

Thanks [~luoyuxia] - it looks like the same exception. In my case parallelism 
does not change though

> CompactOperator cannot continue from checkpoint because of 
> java.util.NoSuchElementException
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32160
>                 URL: https://issues.apache.org/jira/browse/FLINK-32160
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.0, 1.17.0
>         Environment: Flink 1.16/1.17 on k8s (flink-kubernetes-operator 
> v.1.4.0), s3
>            Reporter: Michał Fijołek
>            Priority: Major
>
> Hello :) We have a flink job (v 1.17) on k8s (using official 
> flink-k8s-operator) that reads data from kafka and writes it to s3 using 
> flink-sql using compaction. Job sometimes fails and continues from checkpoint 
> just fine, but once a couple of days we experience a crash loop. Job cannot 
> continue from the latest checkpoint and fails with such exception:
> {noformat}
> java.util.NoSuchElementException at 
> java.base/java.util.ArrayList$Itr.next(Unknown Source)
>  at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>  at java.base/java.lang.Thread.run(Unknown Source){noformat}
> Here’s the relevant code: 
> [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]
> It looks like `CompactOperator` is calling `next()` on iterator without 
> checking `hasNext()` first - why's that? Is it a bug? Why 
> `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty 
> iterator? Is latest checkpoint broken in such case? 
> We have an identical job, but without compaction, and it works smoothly for a 
> couple of weeks now. 
> The whole job is just `select` from kafka and `insert` to s3.
> {noformat}
> CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` (  `foo_bar1` STRING,
>   `foo_bar2` STRING,
>   `foo_bar3` STRING,
>   `foo_bar4` STRING
>   )
>   PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
>   STORED AS parquet
>   LOCATION 's3a://my/bucket/'
>   TBLPROPERTIES (
>     'auto-compaction' = 'true',
>     'compaction.file-size' = '128MB',
>     'sink.parallelism' = '8',
>     'format' = 'parquet',
>     'parquet.compression' = 'SNAPPY',
>     'sink.rolling-policy.rollover-interval' = '1 h',
>     'sink.partition-commit.policy.kind' = 'metastore'
>   ){noformat}
> Checkpoint configuration:
> {noformat}
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend HashMapStateBackend
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Disabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 0
> Checkpoints With Finished Tasks Enabled
> State Changelog Disabled{noformat}
> Is there something wrong with given config or is this some unhandled edge 
> case? 
> Currently our workaround is to restart a job, without using checkpoint - it 
> uses a state from kafka which in this case is fine



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to