This is an automated email from the ASF dual-hosted git repository.
hsaputra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 938bb7f [STREAM STORAGE] Ensure progress while restoring from
checkpoint.
938bb7f is described below
commit 938bb7f38551c6d3d9f94af43703b9d65785e973
Author: Surinder Singh <[email protected]>
AuthorDate: Fri Aug 27 11:01:10 2021 -0700
[STREAM STORAGE] Ensure progress while restoring from checkpoint.
### Motivation
Sometimes the checkpoint restoration never completes. From the logs it looks
like it is stuck waiting for entries in distributedlog. But all the entries
have already been read. This looks like a race condition that causes the
reader to believe that there are more entries.
Although this can happen for any storage container, it is catastrophic when
it
happens for root storage container. In that case all api calls start to
fail.
### Changes
This happens very rarely and is difficult to debug. Here we are adding a
protection around this deadlock. We will monitor the input stream to see we
are getting new data. If new data is not received within specified amount of
time, we will fail the restore process. This will fail the starting of the
storage container. After failure we will restart the storage container
again.
I ran into this issue during my testing and was able confirm that we are
able
to recover from this.
```
2021-08-12T11:47:39.839+0000
org.apache.distributedlog.ReadAheadEntryReader ReadAhead for
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004:<default>
is caught up and no log segments to read now
2021-08-12T11:47:39.834+0000
org.apache.distributedlog.logsegment.PerStreamLogSegmentCache
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004
added log segment (logrecs_000000000000000001 : [LogSegmentId:2534,
firstTxId:99, lastTxId:99, version:VERSION_V5_SEQUENCE_ID,
completionTime:1628793488256, recordCount:1, regionId:0, status:0,
logSegmentSequenceNumber:1, lastEntryId:0, lastSlotId:0, inprogress:false, mi
[...]
2021-08-12T11:47:39.834+0000
org.apache.distributedlog.ReadAheadEntryReader Starting the readahead entry
reader for
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004:<default>
: number of segments: 1, top 10 segments = [[LogSegmentId:2534, firstTxId:99,
lastTxId:99, version:VERSION_V5_SEQUENCE_ID, completionTime:1628793488256,
recordCount:1, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:0,
last [...]
2021-08-12T11:47:39.834+0000
org.apache.distributedlog.logsegment.PerStreamLogSegmentCache
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004
added log segment (logrecs_000000000000000001 : [LogSegmentId:2534,
firstTxId:99, lastTxId:99, version:VERSION_V5_SEQUENCE_ID,
completionTime:1628793488256, recordCount:1, regionId:0, status:0,
logSegmentSequenceNumber:1, lastEntryId:0, lastSlotId:0, inprogress:false, mi
[...]
2021-08-12T11:47:29.516+0000
org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager Failed
to start storage container (0)
2021-08-12T11:47:29.513+0000
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl
De-registered StorageContainer ('0') when failed to start
2021-08-12T11:47:29.511+0000
org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore Timeout waiting for
checkpoint restore: Checkpoint{ID='cea4e286-5224-43ac-b6e0-7321d607f98e',
createdAt: 1628793488360 files: "CURRENT"\nfiles: "MANIFEST-000004"\nfiles:
"OPTIONS-000008"\ntxid: "\\000\\000\\000\\000\\000\\000\\000\\000"\ncreated_at:
1628793488360\nfileInfos {\n name: "CURRENT"\n checksum:
"0861415cada612ea5834d56e2cf1055d3e63979b69eb71d32ae9ae394d8306cd"\n}\nfileInfos
{\n name: [...]
2021-08-12T11:47:29.489+0000
org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointFile Timeout
waiting for copy:
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004
last-read 0 current-read 0 runtime(ms) 300000
2021-08-12T11:42:29.467+0000
org.apache.distributedlog.ReadAheadEntryReader Starting the readahead entry
reader for
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004:<default>
: number of segments: 1, top 10 segments = [[LogSegmentId:2534, firstTxId:99,
lastTxId:99, version:VERSION_V5_SEQUENCE_ID, completionTime:1628793488256,
recordCount:1, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:0,
last [...]
2021-08-12T11:42:29.466+0000
org.apache.distributedlog.logsegment.PerStreamLogSegmentCache
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004
added log segment (logrecs_000000000000000001 : [LogSegmentId:2534,
firstTxId:99, lastTxId:99, version:VERSION_V5_SEQUENCE_ID,
completionTime:1628793488256, recordCount:1, regionId:0, status:0,
logSegmentSequenceNumber:1, lastEntryId:0, lastSlotId:0, inprogress:false, mi
[...]
2021-08-12T11:42:29.466+0000
org.apache.distributedlog.logsegment.PerStreamLogSegmentCache
000000000000000000/000000000000000000/000000000000000000/checkpoints/cea4e286-5224-43ac-b6e0-7321d607f98e/MANIFEST-000004
added log segment (logrecs_000000000000000001 : [LogSegmentId:2534,
firstTxId:99, lastTxId:99, version:VERSION_V5_SEQUENCE_ID,
completionTime:1628793488256, recordCount:1, regionId:0, status:0,
logSegmentSequenceNumber:1, lastEntryId:0, lastSlotId:0, inprogress:false, mi
[...]
```
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>
This closes #2764 from sursingh/statestore-restore-timeout and squashes the
following commits:
0b5cbeed4 [Surinder Singh] Change default timout to 1 Minute.
38fe39f02 [Surinder Singh] Ensure progress while restoring from checkpoint.
---
.../bookkeeper/statelib/api/StateStoreSpec.java | 4 +
.../statelib/impl/kv/RocksdbKVStore.java | 8 +-
.../impl/rocksdb/checkpoint/CheckpointFile.java | 51 ++++++++--
.../impl/rocksdb/checkpoint/CheckpointInfo.java | 20 +++-
.../impl/rocksdb/checkpoint/RocksCheckpointer.java | 5 +-
.../rocksdb/checkpoint/RocksdbRestoreTask.java | 17 +++-
.../statelib/impl/kv/TestStateStore.java | 26 ++++-
.../rocksdb/checkpoint/RocksCheckpointerTest.java | 111 ++++++++++++++++++++-
.../stream/storage/conf/StorageConfiguration.java | 6 ++
.../storage/impl/store/MVCCStoreFactoryImpl.java | 2 +
10 files changed, 227 insertions(+), 23 deletions(-)
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
index 0ec0d7d..49387eb 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
@@ -58,4 +58,8 @@ public class StateStoreSpec {
@Default
private boolean localStorageCleanupEnable = false;
+
+ @Default
+ // Max idle time while waiting to restore from a checkpoint.
+ private Duration checkpointRestoreIdleLimit = Duration.ofMinutes(5);
}
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
index 56aaae5..fa870e3 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
@@ -48,6 +48,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
@@ -160,7 +161,7 @@ public class RocksdbKVStore<K, V> implements KVStore<K, V> {
return this.name;
}
- private void loadRocksdbFromCheckpointStore(StateStoreSpec spec) {
+ private void loadRocksdbFromCheckpointStore(StateStoreSpec spec) throws
StateStoreException {
checkNotNull(spec.getCheckpointIOScheduler(),
"checkpoint io scheduler is not configured");
checkNotNull(spec.getCheckpointDuration(),
@@ -172,12 +173,15 @@ public class RocksdbKVStore<K, V> implements KVStore<K,
V> {
List<CheckpointInfo> checkpoints =
RocksCheckpointer.getCheckpoints(dbName, spec.getCheckpointStore());
for (CheckpointInfo cpi : checkpoints) {
try {
- cpi.restore(dbName, localStorePath, spec.getCheckpointStore());
+ cpi.restore(dbName, localStorePath, spec.getCheckpointStore(),
spec.getCheckpointRestoreIdleLimit());
openRocksdb(spec);
checkpoints.stream()
.filter(cp -> cp != cpi) // ignore the current restored
checkpoint
.forEach(cp -> cp.remove(localStorePath)); // delete
everything else
break;
+ } catch (TimeoutException e) {
+ log.error("Timeout waiting for checkpoint restore: {}", cpi,
e);
+ throw new StateStoreException("Failed to restore checkpoint: "
+ cpi.getId(), e);
} catch (StateStoreException e) {
// Got an exception. Log and try the next checkpoint
log.error("Failed to restore checkpoint: {}", cpi, e);
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
index f829a65..7e4be2a 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointFile.java
@@ -21,15 +21,21 @@
package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
import com.google.common.hash.Hashing;
+import com.google.common.io.CountingInputStream;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
@@ -205,14 +211,45 @@ public class CheckpointFile {
public void copyFromRemote(CheckpointStore checkpointStore,
String dbPrefix,
- String checkpointId) throws IOException {
+ String checkpointId,
+ Duration idleWait) throws IOException,
TimeoutException {
String remoteFilePath = getRemotePath(dbPrefix, checkpointId, true);
+ CountingInputStream cis = new CountingInputStream(
+ checkpointStore.openInputStream(remoteFilePath)
+ );
+ CompletableFuture<Long> copyFuture = CompletableFuture.supplyAsync(()
-> {
+ try {
+ return java.nio.file.Files.copy(
+ cis, Paths.get(getFile().getAbsolutePath()),
+ StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
- try (InputStream is = checkpointStore.openInputStream(remoteFilePath))
{
- java.nio.file.Files.copy(
- is,
- Paths.get(file.getAbsolutePath()),
- StandardCopyOption.REPLACE_EXISTING);
+ long startMs = System.currentTimeMillis();
+
+ while (!copyFuture.isDone()) {
+ long lastCount = cis.getCount();
+ try {
+ // Wait of at most `idleWait` time for this to finish. If it
is not done, we will get a
+ // TimeoutException. While handling the exception we will
check if there was any progress. If there
+ // was some progress, we will try again.
+ copyFuture.get(idleWait.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException("Failed to copy file from remote
checkpoint: " + remoteFilePath, e);
+ } catch (TimeoutException e) {
+ // Check if we made any progress
+ long endMs = System.currentTimeMillis();
+ long newCount = cis.getCount();
+ log.info("Timeout waiting for copy: {} last-read {}
current-read {} runtime(ms) {} ",
+ remoteFilePath, lastCount, newCount, endMs - startMs);
+ if (lastCount == newCount) {
+ throw new TimeoutException("No progress reading: " +
remoteFilePath
+ + " read " + lastCount + " runtime(ms) " + (endMs -
startMs)
+ );
+ }
+ }
}
}
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointInfo.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointInfo.java
index 28a2968..166d308 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointInfo.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/CheckpointInfo.java
@@ -28,7 +28,9 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
@@ -70,7 +72,8 @@ public class CheckpointInfo implements
Comparable<CheckpointInfo> {
return new CheckpointInfo(UUID.randomUUID().toString()) {
public CheckpointMetadata restore(String dbName,
File dbPath,
- CheckpointStore store) throws
StateStoreException {
+ CheckpointStore store,
+ Duration maxIdle) throws
StateStoreException {
try {
Files.createDirectories(getCheckpointPath(dbPath));
updateCurrent(dbPath);
@@ -128,17 +131,26 @@ public class CheckpointInfo implements
Comparable<CheckpointInfo> {
return this.getCreatedAt().compareTo(o.getCreatedAt());
}
- public CheckpointMetadata restore(File dbPath, RocksdbRestoreTask task)
throws StateStoreException, IOException {
+ public CheckpointMetadata restore(File dbPath, RocksdbRestoreTask task)
+ throws StateStoreException, IOException, TimeoutException {
+
task.restore(id, metadata);
updateCurrent(dbPath);
log.info("Successfully restore checkpoint {} to {}", id,
getCheckpointPath(dbPath));
return metadata;
}
+ public CheckpointMetadata restore(String dbName, File dbPath,
CheckpointStore store)
+ throws StateStoreException, TimeoutException {
+
+ return restore(dbName, dbPath, store, Duration.ofMinutes(1));
+ }
+
+ public CheckpointMetadata restore(String dbName, File dbPath,
CheckpointStore store, Duration maxIdle)
+ throws StateStoreException, TimeoutException {
- public CheckpointMetadata restore(String dbName, File dbPath,
CheckpointStore store) throws StateStoreException {
try {
File checkpointsDir = new File(dbPath, "checkpoints");
- RocksdbRestoreTask task = new RocksdbRestoreTask(dbName,
checkpointsDir, store);
+ RocksdbRestoreTask task = new RocksdbRestoreTask(dbName,
checkpointsDir, store, maxIdle);
return restore(dbPath, task);
} catch (IOException ioe) {
log.error("Failed to restore rocksdb {}", dbName, ioe);
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
index e3b7413..f5f507a 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
@@ -27,6 +27,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
@@ -44,7 +45,7 @@ public class RocksCheckpointer implements AutoCloseable {
public static CheckpointMetadata restore(String dbName,
File dbPath,
CheckpointStore checkpointStore)
- throws StateStoreException {
+ throws StateStoreException, TimeoutException {
CheckpointInfo checkpoint = getLatestCheckpoint(dbName,
checkpointStore);
return restore(checkpoint, dbName, dbPath, checkpointStore);
}
@@ -53,7 +54,7 @@ public class RocksCheckpointer implements AutoCloseable {
String dbName,
File dbPath,
CheckpointStore checkpointStore)
- throws StateStoreException {
+ throws StateStoreException, TimeoutException {
checkpoint.restore(dbName, dbPath, checkpointStore);
// after successfully restore from remote checkpoints, cleanup other
unused checkpoints
cleanupLocalCheckpoints(dbPath, checkpoint.getId());
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
index 6038cb2..70dd2e3 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbRestoreTask.java
@@ -21,7 +21,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
@@ -37,17 +39,26 @@ public class RocksdbRestoreTask {
private final File checkpointDir;
private final CheckpointStore checkpointStore;
private final String dbPrefix;
+ private final Duration idleWait;
public RocksdbRestoreTask(String dbName,
File checkpointDir,
CheckpointStore checkpointStore) {
+ this(dbName, checkpointDir, checkpointStore, Duration.ofMinutes(5));
+ }
+
+ public RocksdbRestoreTask(String dbName,
+ File checkpointDir,
+ CheckpointStore checkpointStore,
+ Duration idleWait) {
this.dbName = dbName;
this.checkpointDir = checkpointDir;
this.checkpointStore = checkpointStore;
this.dbPrefix = String.format("%s", dbName);
+ this.idleWait = idleWait;
}
- public void restore(String checkpointId, CheckpointMetadata metadata)
throws StateStoreException {
+ public void restore(String checkpointId, CheckpointMetadata metadata)
throws StateStoreException, TimeoutException {
File checkpointedDir = new File(checkpointDir, checkpointId);
try {
@@ -72,9 +83,9 @@ public class RocksdbRestoreTask {
}
private void copyFilesFromRemote(String checkpointId,
- List<CheckpointFile> remoteFiles) throws
IOException {
+ List<CheckpointFile> remoteFiles) throws
IOException, TimeoutException {
for (CheckpointFile file : remoteFiles) {
- file.copyFromRemote(checkpointStore, dbPrefix, checkpointId);
+ file.copyFromRemote(checkpointStore, dbPrefix, checkpointId,
idleWait);
}
}
}
diff --git
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
index 20e7927..c4a5b23 100644
---
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
+++
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
@@ -27,9 +27,11 @@ import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.coder.StringUtf8Coder;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
@@ -70,6 +72,7 @@ public class TestStateStore {
private boolean checkpointChecksumCompatible;
private boolean enableNonChecksumCompatibility;
private boolean localStorageCleanup;
+ private Duration checkpointRestoreIdleWait;
public TestStateStore(String dbName,
File localDir,
@@ -135,8 +138,24 @@ public class TestStateStore {
localStorageCleanup = enable;
}
+ public File getLocalCheckpointsDir() {
+ return localCheckpointsDir;
+ }
+
+ public CheckpointStore getCheckpointStore() {
+ return checkpointStore;
+ }
+
+ public void setCheckpointRestoreIdleWait(Duration d) {
+ checkpointRestoreIdleWait = d;
+ }
+
+ public CheckpointStore newCheckpointStore() {
+ return new FSCheckpointManager(remoteDir);
+ }
+
public void init() throws StateStoreException {
- checkpointStore = new FSCheckpointManager(remoteDir);
+ checkpointStore = newCheckpointStore();
StateStoreSpec.StateStoreSpecBuilder builder = StateStoreSpec.builder()
.name(dbName)
.keyCoder(StringUtf8Coder.of())
@@ -150,6 +169,9 @@ public class TestStateStore {
builder = builder.checkpointStore(checkpointStore)
.checkpointIOScheduler(checkpointExecutor);
}
+ if (checkpointRestoreIdleWait != null) {
+ builder =
builder.checkpointRestoreIdleLimit(checkpointRestoreIdleWait);
+ }
spec = builder.build();
store = new RocksdbKVStore<>();
store.init(spec);
@@ -202,7 +224,7 @@ public class TestStateStore {
this.init();
}
- CheckpointMetadata restore(CheckpointInfo checkpoint) throws
StateStoreException {
+ CheckpointMetadata restore(CheckpointInfo checkpoint) throws
StateStoreException, TimeoutException {
try {
MoreFiles.deleteRecursively(
checkpoint.getCheckpointPath(localDir),
diff --git
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
index 37be2d3..28cf4b5 100644
---
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
+++
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
@@ -25,23 +25,24 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
-
import java.io.File;
import java.io.FileInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.coder.StringUtf8Coder;
import org.apache.bookkeeper.common.kv.KV;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
@@ -59,11 +60,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import org.rocksdb.Checkpoint;
/**
* Unit test of {@link RocksCheckpointer}.
*/
+@Slf4j
public class RocksCheckpointerTest {
@Rule
@@ -354,6 +357,108 @@ public class RocksCheckpointerTest {
}
}
+ InputStream getBlockedStream(TestStateStore testStore, String path,
Duration blockFor) throws IOException {
+ InputStream stream =
testStore.getCheckpointStore().openInputStream(path);
+
+
+ FilterInputStream res = new FilterInputStream(stream) {
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws
IOException {
+ try {
+ Thread.sleep(blockFor.toMillis());
+ } catch (InterruptedException e) {
+ }
+ return super.read(b, off, len);
+ }
+ };
+ return res;
+
+ }
+
+ @Test(timeout = 20000)
+ public void testRestoreBlockedSlow() throws Exception {
+ final int numKvs = 100;
+ TestStateStore testStore = Mockito.spy(new TestStateStore(
+ runtime.getMethodName(), localDir, remoteDir, true, false));
+
+ store.close();
+
+ testStore.enableCheckpoints(true);
+ testStore.setCheckpointRestoreIdleWait(Duration.ofSeconds(3));
+
+ testStore.init();
+
+ testStore.addNumKVs("transaction-1", numKvs, 0);
+ // create base checkpoint
+ String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+ testStore.close();
+
+ String dbName = runtime.getMethodName();
+
+ CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+ CheckpointStore mockCheckpointStore =
Mockito.spy(testStore.getCheckpointStore());
+ File dbPath = localDir;
+
+ List<CheckpointFile> files =
CheckpointFile.list(testStore.getLocalCheckpointsDir(),
+ checkpoint.getMetadata());
+ String testFile = files.get(0).getRemotePath(dbName,
checkpoint.getId(), true);
+
+ // We wait for 10 sec for some data to show up. We will add the data
after 8 sec. So the restore should succeed.
+ Mockito.doReturn(getBlockedStream(testStore, testFile,
Duration.ofSeconds(2)))
+ .when(mockCheckpointStore)
+ .openInputStream(testFile);
+
+
Mockito.doReturn(mockCheckpointStore).when(testStore).newCheckpointStore();
+
+ try {
+ testStore.restore();
+ } catch (Exception e) {
+ fail("restore should succeed from slow stream");
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testRestoreBlockedTimeout() throws Exception {
+ final int numKvs = 100;
+ TestStateStore testStore = Mockito.spy(new TestStateStore(
+ runtime.getMethodName(), localDir, remoteDir, true, false));
+
+ store.close();
+
+ testStore.enableCheckpoints(true);
+ testStore.setCheckpointRestoreIdleWait(Duration.ofSeconds(10));
+
+ testStore.init();
+
+ testStore.addNumKVs("transaction-1", numKvs, 0);
+ // create base checkpoint
+ String baseCheckpoint = testStore.checkpoint("checkpoint-1");
+ testStore.close();
+
+ String dbName = runtime.getMethodName();
+
+ CheckpointInfo checkpoint = testStore.getLatestCheckpoint();
+ CheckpointStore mockCheckpointStore =
Mockito.spy(testStore.getCheckpointStore());
+ File dbPath = localDir;
+
+ List<CheckpointFile> files =
CheckpointFile.list(testStore.getLocalCheckpointsDir(),
+ checkpoint.getMetadata());
+ String testFile = files.get(0).getRemotePath(dbName,
checkpoint.getId(), true);
+
+ Mockito.doReturn(getBlockedStream(testStore, testFile,
Duration.ofSeconds(20)))
+ .when(mockCheckpointStore)
+ .openInputStream(testFile);
+
+
Mockito.doReturn(mockCheckpointStore).when(testStore).newCheckpointStore();
+
+ try {
+ testStore.restore();
+ fail("should Fail to restore from a blocked stream");
+ } catch (Exception e) {
+
+ }
+ }
+
@Test
public void testStaleSSTFile() throws Exception {
final int numKvs = 100;
diff --git
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index 289081b..8f3fa32 100644
---
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -37,6 +37,8 @@ public class StorageConfiguration extends
ComponentConfiguration {
private static final String LOCAL_STORAGE_CLEANUP_ENABLE =
"local.storage.cleanup.enable";
+ private static final String CHECKPOINT_RESTORE_IDLE_LIMIT_MS =
"checkpoint.restore.idle.limit.ms";
+
public StorageConfiguration(CompositeConfiguration conf) {
super(conf, COMPONENT_PREFIX);
}
@@ -106,4 +108,8 @@ public class StorageConfiguration extends
ComponentConfiguration {
public boolean getLocalStorageCleanupEnable() {
return getBoolean(LOCAL_STORAGE_CLEANUP_ENABLE, true);
}
+
+ public long getCheckpointRestoreIdleLimitMs() {
+ return getLong(CHECKPOINT_RESTORE_IDLE_LIMIT_MS,
TimeUnit.MINUTES.toMillis(5));
+ }
}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 527fb93..d1e05ef 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -212,6 +212,8 @@ public class MVCCStoreFactoryImpl implements
MVCCStoreFactory {
.checkpointChecksumEnable(storageConf.getCheckpointChecksumEnable())
.checkpointChecksumCompatible(storageConf.getCheckpointChecksumCompatible())
.localStorageCleanupEnable(storageConf.getLocalStorageCleanupEnable())
+ .checkpointRestoreIdleLimit(
+
Duration.ofMillis(storageConf.getCheckpointRestoreIdleLimitMs()))
.build();