[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will always
try to restore the latest checkpoint if there is one available. Next it will 
check
whether savepoint restore settings have been set. If so, then it will try to 
restore
the savepoint. Only if these settings are not set, the job will be started from
scratch.

This closes #5444.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a4e8964
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a4e8964
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a4e8964

Branch: refs/heads/master
Commit: 3a4e89643d7d7642dde9b5644491f261d4d545bd
Parents: 9e85bb0
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 13 15:19:18 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 18 10:12:55 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  23 ++
 .../TestingCheckpointRecoveryFactory.java       |  46 ++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  24 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  | 217 ++++++++++++++++---
 5 files changed, 260 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ed3570a..016defb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -966,7 +966,7 @@ public class CheckpointCoordinator {
         * Restores the latest checkpointed state.
         *
         * @param tasks Map of job vertices to restore. State for these 
vertices is
-        * restored via {@link Execution#setInitialState(TaskStateSnapshot)}.
+        * restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
         * @param errorIfNoCheckpoint Fail if no completed checkpoint is 
available to
         * restore from.
         * @param allowNonRestoredState Allow checkpoint state that cannot be 
mapped
@@ -1065,7 +1065,7 @@ public class CheckpointCoordinator {
         *                         mapped to any job vertex in tasks.
         * @param tasks            Map of job vertices to restore. State for 
these
         *                         vertices is restored via
-        *                         {@link 
Execution#setInitialState(TaskStateSnapshot)}.
+        *                         {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
         * @param userClassLoader  The class loader to resolve serialized 
classes in
         *                         legacy savepoint versions.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index dfa4d1c..2a4b881 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
@@ -297,6 +298,28 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        jobMasterConfiguration.getSlotRequestTimeout(),
                        log);
 
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       // check whether we find a valid checkpoint
+                       if 
(!checkpointCoordinator.restoreLatestCheckpointedState(
+                               executionGraph.getAllVertices(),
+                               false,
+                               false)) {
+
+                               // check whether we can restore from a savepoint
+                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+
+                               if 
(savepointRestoreSettings.restoreSavepoint()) {
+                                       checkpointCoordinator.restoreSavepoint(
+                                               
savepointRestoreSettings.getRestorePath(),
+                                               
savepointRestoreSettings.allowNonRestoredState(),
+                                               executionGraph.getAllVertices(),
+                                               userCodeLoader);
+                               }
+                       }
+               }
+
                // register self as job status change listener
                executionGraph.registerJobStatusListener(new 
JobManagerJobStatusListener());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..7bc0c85
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Simple {@link CheckpointRecoveryFactory} which is initialized with a
+ * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ */
+public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+
+       private final CompletedCheckpointStore store;
+       private final CheckpointIDCounter counter;
+
+       public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store, 
CheckpointIDCounter counter) {
+               this.store = store;
+               this.counter = counter;
+       }
+
+       @Override
+       public CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
+               return store;
+       }
+
+       @Override
+       public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) 
throws Exception {
+               return counter;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 93cde3a..8f1e12c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -177,7 +178,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                        submittedJobGraphStore.start(null);
                        CompletedCheckpointStore checkpointStore = new 
RecoverableCompletedCheckpointStore();
                        CheckpointIDCounter checkpointCounter = new 
StandaloneCheckpointIDCounter();
-                       CheckpointRecoveryFactory checkpointStateFactory = new 
MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
+                       CheckpointRecoveryFactory checkpointStateFactory = new 
TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
                        TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
                        TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService(
                                null,
@@ -465,27 +466,6 @@ public class JobManagerHARecoveryTest extends TestLogger {
                }
        }
 
-       static class MyCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
-
-               private final CompletedCheckpointStore store;
-               private final CheckpointIDCounter counter;
-
-               public MyCheckpointRecoveryFactory(CompletedCheckpointStore 
store, CheckpointIDCounter counter) {
-                       this.store = store;
-                       this.counter = counter;
-               }
-
-               @Override
-               public CompletedCheckpointStore createCheckpointStore(JobID 
jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws 
Exception {
-                       return store;
-               }
-
-               @Override
-               public CheckpointIDCounter createCheckpointIDCounter(JobID 
jobId) throws Exception {
-                       return counter;
-               }
-       }
-
        public static class BlockingInvokable extends AbstractInvokable {
 
                private static final OneShotLatch LATCH = new OneShotLatch();

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e142d9c..e401020 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -25,13 +25,25 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -40,6 +52,8 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -57,7 +71,12 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -162,21 +181,7 @@ public class JobMasterTest extends TestLogger {
                final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
                final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
 
-               final JobMaster jobMaster = new JobMaster(
-                       rpcService,
-                       jobMasterConfiguration,
-                       jmResourceId,
-                       jobGraph,
-                       haServices,
-                       jobManagerSharedServices,
-                       fastHeartbeatServices,
-                       blobServer,
-                       null,
-                       new NoOpOnCompletionActions(),
-                       testingFatalErrorHandler,
-                       JobMasterTest.class.getClassLoader(),
-                       null,
-                       null);
+               final JobMaster jobMaster = 
createJobMaster(jobMasterConfiguration, jobGraph, haServices, 
jobManagerSharedServices);
 
                CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -237,21 +242,7 @@ public class JobMasterTest extends TestLogger {
                final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
                final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
 
-               final JobMaster jobMaster = new JobMaster(
-                       rpcService,
-                       jobMasterConfiguration,
-                       jmResourceId,
-                       jobGraph,
-                       haServices,
-                       jobManagerSharedServices,
-                       fastHeartbeatServices,
-                       blobServer,
-                       null,
-                       new NoOpOnCompletionActions(),
-                       testingFatalErrorHandler,
-                       JobMasterTest.class.getClassLoader(),
-                       null,
-                       null);
+               final JobMaster jobMaster = 
createJobMaster(jobMasterConfiguration, jobGraph, haServices, 
jobManagerSharedServices);
 
                CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -282,6 +273,152 @@ public class JobMasterTest extends TestLogger {
        }
 
        /**
+        * Tests that a JobMaster will restore the given JobGraph from its 
savepoint upon
+        * initial submission.
+        */
+       @Test
+       public void testRestoringFromSavepoint() throws Exception {
+
+               // create savepoint data
+               final long savepointId = 42L;
+               final File savepointFile = createSavepoint(savepointId);
+
+               // set savepoint settings
+               final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+                       savepointFile.getAbsolutePath(),
+                       true);
+               final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+               final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new 
TestingCheckpointRecoveryFactory(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+               
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build());
+
+               try {
+                       // starting the JobMaster should have read the savepoint
+                       final CompletedCheckpoint savepointCheckpoint = 
completedCheckpointStore.getLatestCheckpoint();
+
+                       assertThat(savepointCheckpoint, 
Matchers.notNullValue());
+
+                       assertThat(savepointCheckpoint.getCheckpointID(), 
Matchers.is(savepointId));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       /**
+        * Tests that an existing checkpoint will have precedence over an 
savepoint
+        */
+       @Test
+       public void testCheckpointPrecedesSavepointRecovery() throws Exception {
+
+               // create savepoint data
+               final long savepointId = 42L;
+               final File savepointFile = createSavepoint(savepointId);
+
+               // set savepoint settings
+               final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath("" +
+                               savepointFile.getAbsolutePath(),
+                       true);
+               final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+               final long checkpointId = 1L;
+
+               final CompletedCheckpoint completedCheckpoint = new 
CompletedCheckpoint(
+                       jobGraph.getJobID(),
+                       checkpointId,
+                       1L,
+                       1L,
+                       Collections.emptyMap(),
+                       null,
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                       new DummyCheckpointStorageLocation());
+
+               final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+               completedCheckpointStore.addCheckpoint(completedCheckpoint);
+               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new 
TestingCheckpointRecoveryFactory(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+               
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build());
+
+               try {
+                       // starting the JobMaster should have read the savepoint
+                       final CompletedCheckpoint savepointCheckpoint = 
completedCheckpointStore.getLatestCheckpoint();
+
+                       assertThat(savepointCheckpoint, 
Matchers.notNullValue());
+
+                       assertThat(savepointCheckpoint.getCheckpointID(), 
Matchers.is(checkpointId));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       private File createSavepoint(long savepointId) throws IOException {
+               final File savepointFile = temporaryFolder.newFile();
+               final SavepointV2 savepoint = new SavepointV2(savepointId, 
Collections.emptyList(), Collections.emptyList());
+
+               try (FileOutputStream fileOutputStream = new 
FileOutputStream(savepointFile)) {
+                       Checkpoints.storeCheckpointMetadata(savepoint, 
fileOutputStream);
+               }
+
+               return savepointFile;
+       }
+
+       @Nonnull
+       private JobGraph 
createJobGraphWithCheckpointing(SavepointRestoreSettings 
savepointRestoreSettings) {
+               final JobGraph jobGraph = new JobGraph();
+
+               // enable checkpointing which is required to resume from a 
savepoint
+               final CheckpointCoordinatorConfiguration 
checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+                       1000L,
+                       1000L,
+                       1000L,
+                       1,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true);
+               final JobCheckpointingSettings checkpointingSettings = new 
JobCheckpointingSettings(
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       checkpoinCoordinatorConfiguration,
+                       null);
+               jobGraph.setSnapshotSettings(checkpointingSettings);
+               jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+               return jobGraph;
+       }
+
+       @Nonnull
+       private JobMaster createJobMaster(
+                       JobMasterConfiguration jobMasterConfiguration,
+                       JobGraph jobGraph,
+                       HighAvailabilityServices highAvailabilityServices,
+                       JobManagerSharedServices jobManagerSharedServices) 
throws Exception {
+               return new JobMaster(
+                       rpcService,
+                       jobMasterConfiguration,
+                       jmResourceId,
+                       jobGraph,
+                       highAvailabilityServices,
+                       jobManagerSharedServices,
+                       fastHeartbeatServices,
+                       blobServer,
+                       null,
+                       new NoOpOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader(),
+                       null,
+                       null);
+       }
+
+       /**
         * No op implementation of {@link OnCompletionActions}.
         */
        private static final class NoOpOnCompletionActions implements 
OnCompletionActions {
@@ -297,4 +434,24 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       private static final class DummyCheckpointStorageLocation implements 
CompletedCheckpointStorageLocation {
+
+               private static final long serialVersionUID = 
164095949572620688L;
+
+               @Override
+               public String getExternalPointer() {
+                       return null;
+               }
+
+               @Override
+               public StreamStateHandle getMetadataHandle() {
+                       return null;
+               }
+
+               @Override
+               public void disposeStorageLocation() throws IOException {
+
+               }
+       }
+
 }

Reply via email to