[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073041#comment-17073041
 ] 

Yun Tang edited comment on FLINK-16770 at 4/1/20, 5:55 PM:
-----------------------------------------------------------

[~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it 
locally with unit test. There exists a logic competition between 
{{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, 
current {{operationLock}} can only ensure the async phase to delete this 
pending checkpoint and adding completed checkpoint would not happen at the same 
time. However, this cannot ensure the pending checkpoint would not be firstly 
added to checkpoint store and then dropped. 

One quick fix would add atomic boolean to share among these two async 
operations, once this pending checkpoint is added to checkpoint store, it would 
not be dropped anymore asynchronously. However, this could lead something 
misleading: if this pending checkpoint is added to checkpoint store 
successfully asynchronously but tagged as disposed in the main thread. Although 
we could avoid to drop this in the async phase of  
{{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this 
pending checkpoint as successful and would not display in the checkpoint web 
UI. But luckily, we could ensure at least no data will be deleted by mistake, 
job could still failover by recovering from latest completed checkpoint.

Another solution needs to compare and set some atomic variable in the main 
thread when {{PendingCheckpoint#dispose}} and share that when we try to add 
checkpoint store. If we firstly arrive to add checkpoint to store, we would not 
let main thread to tag that pending checkpoint as discarded. On the other hand, 
if we firstly arrive to tag this pending checkpoint would be discarded, we 
would not try to add to checkpoint store. I think this could be really 
light-weight and non-blocking, but it would introduce some extra CAS work in 
the main thread. What do you think of this ? [~pnowojski]


was (Author: yunta):
[~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it 
locally with unit test. There exists a logic competition between 
{{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, 
current {{operationLock}} can only ensure the async phase to delete this 
pending checkpoint and adding completed checkpoint would not happen at the same 
time. However, this cannot ensure the pending checkpoint would not be firstly 
added to checkpoint store and then dropped. 
One quick fix would add atomic boolean to share among these two async 
operations, once this pending checkpoint is added to checkpoint store, it would 
not be dropped anymore asynchronously. However, this could lead something 
misleading: if this pending checkpoint is added to checkpoint store 
successfully asynchronously but tagged as disposed in the main thread. Although 
we could avoid to drop this in the async phase of  
{{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this 
pending checkpoint as successful and would not display in the checkpoint web 
UI. But luckily, we could ensure at least no data will be deleted by mistake, 
job could still failover by recovering from latest completed checkpoint.
Another solution needs to compare and set some atomic variable in the main 
thread when {{PendingCheckpoint#dispose}} and share that when we try to add 
checkpoint store. If we firstly arrive to add checkpoint to store, we would not 
let main thread to tag that pending checkpoint as discarded. On the other hand, 
if we firstly arrive to tag this pending checkpoint would be discarded, we 
would not try to add to checkpoint store. I think this could be really 
light-weight and non-blocking, but it would introduce some extra CAS work in 
the main thread. What do you think of this ? [~pnowojski]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16770
>                 URL: https://issues.apache.org/jira/browse/FLINK-16770
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Tests
>    Affects Versions: 1.11.0
>            Reporter: Zhijiang
>            Assignee: Yun Tang
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.11.0
>
>         Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==============================================================================
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> ------------------------------------------------------------
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
> 2020-03-25T06:50:58.4757862Z  at 
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> 2020-03-25T06:50:58.4760282Z  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
> 2020-03-25T06:50:58.4763591Z  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:963)
> 2020-03-25T06:50:58.4764274Z  at 
> java.security.AccessController.doPrivileged(Native Method)
> 2020-03-25T06:50:58.4764809Z  at 
> javax.security.auth.Subject.doAs(Subject.java:422)
> 2020-03-25T06:50:58.4765434Z  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> 2020-03-25T06:50:58.4766180Z  at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 2020-03-25T06:50:58.4773549Z  at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:963)
> 2020-03-25T06:50:58.4774502Z Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4775382Z  at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
> 2020-03-25T06:50:58.4776163Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
> 2020-03-25T06:50:58.4777706Z  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:90)
> 2020-03-25T06:50:58.4778334Z  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:58)
> 2020-03-25T06:50:58.4779007Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> 2020-03-25T06:50:58.4779654Z  at 
> org.apache.flink.streaming.tests.DataStreamAllroundTestProgram.main(DataStreamAllroundTestProgram.java:215)
> 2020-03-25T06:50:58.4780371Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-03-25T06:50:58.4784367Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-03-25T06:50:58.4785063Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-03-25T06:50:58.4785557Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-03-25T06:50:58.4786204Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> 2020-03-25T06:50:58.4786547Z  ... 11 more
> 2020-03-25T06:50:58.4787007Z Caused by: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4787717Z  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-03-25T06:50:58.4788203Z  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-03-25T06:50:58.4788835Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
> 2020-03-25T06:50:58.4789362Z  ... 20 more
> 2020-03-25T06:50:58.4789720Z Caused by: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4790467Z  at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> 2020-03-25T06:50:58.4791087Z  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> 2020-03-25T06:50:58.4791650Z  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> 2020-03-25T06:50:58.4792560Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-03-25T06:50:58.4793617Z  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2020-03-25T06:50:58.4794496Z  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> 2020-03-25T06:50:58.4795255Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2020-03-25T06:50:58.4796264Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2020-03-25T06:50:58.4796867Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2020-03-25T06:50:58.4797439Z  at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> 2020-03-25T06:50:58.4798000Z  at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> 2020-03-25T06:50:58.4798589Z  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> 2020-03-25T06:50:58.4799162Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-03-25T06:50:58.4799727Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-03-25T06:50:58.4800210Z  at java.lang.Thread.run(Thread.java:748)
> 2020-03-25T06:50:58.4800767Z Caused by: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error., <Exception on server side:
> 2020-03-25T06:50:58.4801351Z 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
> 2020-03-25T06:50:58.4801938Z  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> 2020-03-25T06:50:58.4803660Z  at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> 2020-03-25T06:50:58.4804555Z  at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> 2020-03-25T06:50:58.4805235Z  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> 2020-03-25T06:50:58.4805839Z  at 
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2020-03-25T06:50:58.4806515Z  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2020-03-25T06:50:58.4807184Z  at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2020-03-25T06:50:58.4807807Z  at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2020-03-25T06:50:58.4808417Z  at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2020-03-25T06:50:58.4809055Z  at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-03-25T06:50:58.4809783Z Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
> 2020-03-25T06:50:58.4810756Z  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> 2020-03-25T06:50:58.4811444Z  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> 2020-03-25T06:50:58.4811937Z  ... 6 more
> 2020-03-25T06:50:58.4812414Z Caused by: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
> 2020-03-25T06:50:58.4813330Z  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
> 2020-03-25T06:50:58.4814154Z  at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> 2020-03-25T06:50:58.4814846Z  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> 2020-03-25T06:50:58.4815622Z  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> 2020-03-25T06:50:58.4816074Z  ... 7 more
> 2020-03-25T06:50:58.4816924Z Caused by: java.io.IOException: Cannot access 
> file system for checkpoint/savepoint path 'file://.'.
> 2020-03-25T06:50:58.4817673Z  at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233)
> 2020-03-25T06:50:58.4818450Z  at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> 2020-03-25T06:50:58.4819276Z  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1312)
> 2020-03-25T06:50:58.4819943Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314)
> 2020-03-25T06:50:58.4820633Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247)
> 2020-03-25T06:50:58.4821258Z  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:223)
> 2020-03-25T06:50:58.4821862Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:118)
> 2020-03-25T06:50:58.4822505Z  at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> 2020-03-25T06:50:58.4823115Z  at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:282)
> 2020-03-25T06:50:58.4823665Z  at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:270)
> 2020-03-25T06:50:58.4824485Z  at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> 2020-03-25T06:50:58.4825597Z  at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> 2020-03-25T06:50:58.4826400Z  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
> 2020-03-25T06:50:58.4826919Z  ... 10 more
> 2020-03-25T06:50:58.4829018Z Caused by: java.io.IOException: Found local file 
> path with authority '.' in path 'file://.'. Hint: Did you forget a slash? 
> (correct path would be 'file:///.')
> 2020-03-25T06:50:58.4829875Z  at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
> 2020-03-25T06:50:58.4830364Z  at 
> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> 2020-03-25T06:50:58.4830807Z  at 
> org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
> 2020-03-25T06:50:58.4831408Z  at 
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230)
> 2020-03-25T06:50:58.4832021Z  ... 22 more
> 2020-03-25T06:50:58.4832151Z 
> 2020-03-25T06:50:58.4832356Z End of exception on server side>]
> 2020-03-25T06:50:58.4832720Z  at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> 2020-03-25T06:50:58.4833238Z  at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> 2020-03-25T06:50:58.4833884Z  at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> 2020-03-25T06:50:58.4834376Z  at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> 2020-03-25T06:50:58.4834724Z  ... 4 more
> 2020-03-25T06:50:58.5042321Z Resuming from externalized checkpoint job could 
> not be started.
> 2020-03-25T06:50:58.5044210Z [FAIL] Test script contains errors.
> 2020-03-25T06:50:58.5052826Z Checking of logs skipped.
> 2020-03-25T06:50:58.5053164Z 
> 2020-03-25T06:50:58.5054116Z [FAIL] 'Resuming Externalized Checkpoint (rocks, 
> incremental, scale up) end-to-end test' failed after 0 minutes and 27 
> seconds! Test exited with exit code 1
> 2020-03-25T06:50:58.5054639Z 
> 2020-03-25T06:50:58.8067813Z Stopping taskexecutor daemon (pid: 86888) on 
> host fv-az655.
> 2020-03-25T06:50:59.0257270Z Stopping standalonesession daemon (pid: 86603) 
> on host fv-az655.
> 2020-03-25T06:50:59.4920994Z 
> 2020-03-25T06:50:59.5000014Z ##[error]Bash exited with code '1'.
> 2020-03-25T06:50:59.5015374Z ##[section]Finishing: Run e2e tests
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to