This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c69772b #2288: add metadata to StreamStorage ledgers (#2296)
c69772b is described below
commit c69772b1ace050b166ef69e3bc72d5f2c19cd5af
Author: dmercuriali <[email protected]>
AuthorDate: Mon Mar 23 12:29:13 2020 +0100
#2288: add metadata to StreamStorage ledgers (#2296)
Descriptions of the changes in this PR:
Added 'application' and 'component' metadata to StreamStorage ledgers.
application: bk-stream-storage-service
component: state-store / checkpoint-store
Master Issue: #2288
* #2288 add metadata to StreamStorage ledgers
* #2288 add metadata to StreamStorage ledgers
Co-authored-by: Dennis Mercuriali <[email protected]>
---
.../java/org/apache/bookkeeper/statelib/impl/Constants.java | 1 +
.../statelib/impl/journal/AbstractStateStoreWithJournal.java | 12 ++++++++++--
.../impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java | 7 ++++++-
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
index eb5a1b3..d4ce08b 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
@@ -27,6 +27,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class Constants {
+ public static final String LEDGER_METADATA_APPLICATION_STREAM_STORAGE =
"bk-stream-storage-service";
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final byte[] NULL_START_KEY = EMPTY_BYTE_ARRAY;
public static final byte[] NULL_END_KEY = new byte[] { 0 };
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index 1ab5da6..9e566be 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreClosedException;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import
org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
+import org.apache.bookkeeper.statelib.impl.Constants;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
@@ -49,6 +50,7 @@ import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.util.Utils;
@@ -218,7 +220,10 @@ public abstract class
AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
} catch (IOException e) {
return FutureUtils.exception(e);
}
- return logManager.openAsyncLogWriter().thenComposeAsync(w -> {
+ LedgerMetadata metadata = new LedgerMetadata();
+
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+ metadata.setComponent("state-store");
+ return logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
synchronized (this) {
writer = w;
nextRevision = writer.getLastTxId();
@@ -233,7 +238,10 @@ public abstract class
AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
}
private CompletableFuture<DLSN> writeCatchUpMarker() {
- return logManager.openAsyncLogWriter().thenComposeAsync(w -> {
+ LedgerMetadata metadata = new LedgerMetadata();
+
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+ metadata.setComponent("state-store");
+ return logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
synchronized (this) {
writer = w;
nextRevision = writer.getLastTxId();
diff --git
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
index 8dc0447..0e2de49 100644
---
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
+++
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -32,11 +32,13 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.Constants;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogExistsException;
@@ -98,7 +100,10 @@ public class DLCheckpointStore implements CheckpointStore {
try {
DistributedLogManager dlm = namespace.openLog(
filePath);
- AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ LedgerMetadata metadata = new LedgerMetadata();
+
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+ metadata.setComponent("checkpoint-store");
+ AsyncLogWriter writer =
Utils.ioResult(dlm.openAsyncLogWriter(metadata));
return new BufferedOutputStream(
new DLOutputStream(dlm, writer), 128 * 1024);
} catch (LogNotFoundException le) {