[ 
https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25478:
-----------------------------
        Parent: FLINK-21352
    Issue Type: Sub-task  (was: Bug)

> Same materialized state handle should register multi times
> ----------------------------------------------------------
>
>                 Key: FLINK-25478
>                 URL: https://issues.apache.org/jira/browse/FLINK-25478
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing, Runtime / State Backends
>            Reporter: Yun Tang
>            Priority: Critical
>             Fix For: 1.15.0
>
>
> Currently, changelog materialization would call RocksDB state backend's 
> snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as 
> ChangelogStateBackendHandleImpl's materialized artifacts. And before next 
> materialization, it will always report the same 
> {{IncrementalRemoteKeyedStateHandle}} as before.
> It's fine to register this for the 1st time. However, for the 2nd time to 
> register {{IncrementalRemoteKeyedStateHandle}} (via 
> {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will 
> discard the private state artifacts without check the register reference:
> IncrementalRemoteKeyedStateHandle:
> {code:java}
> public void discardState() throws Exception {
>         try {
>             StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
>         } catch (Exception e) {
>             LOG.warn("Could not properly discard misc file states.", e);
>         }
> }
> {code}
> Thus, this would delete the private state (such as RocksDB's MAINFEST), and 
> once restore, job would not report FileNotFoundException.
>  
> {code:java}
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403)
>     at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477)
>  
>     at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>      
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153)
>     at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
>  
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
>  
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  
>     ... 10 more
> Caused by: java.io.FileNotFoundException: xxxxx
>     at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>  
>     at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127)
>  
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>     at 
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>  
>     at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  ~[?:1.8.0_102]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  ~[?:1.8.0_102]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  ~[?:1.8.0_102]
>     ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to