This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c69772b   #2288: add metadata to StreamStorage ledgers (#2296)
c69772b is described below

commit c69772b1ace050b166ef69e3bc72d5f2c19cd5af
Author: dmercuriali <[email protected]>
AuthorDate: Mon Mar 23 12:29:13 2020 +0100

     #2288: add metadata to StreamStorage ledgers (#2296)
    
    Descriptions of the changes in this PR:
    
    Added 'application' and 'component' metadata to StreamStorage ledgers.
    application: bk-stream-storage-service
    component: state-store / checkpoint-store
    
    Master Issue: #2288
    
    * #2288 add metadata to StreamStorage ledgers
    
    * #2288 add metadata to StreamStorage ledgers
    
    Co-authored-by: Dennis Mercuriali <[email protected]>
---
 .../java/org/apache/bookkeeper/statelib/impl/Constants.java  |  1 +
 .../statelib/impl/journal/AbstractStateStoreWithJournal.java | 12 ++++++++++--
 .../impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java      |  7 ++++++-
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
index eb5a1b3..d4ce08b 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/Constants.java
@@ -27,6 +27,7 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class Constants {
 
+    public static final String LEDGER_METADATA_APPLICATION_STREAM_STORAGE = 
"bk-stream-storage-service";
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
     public static final byte[] NULL_START_KEY = EMPTY_BYTE_ARRAY;
     public static final byte[] NULL_END_KEY = new byte[] { 0 };
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index 1ab5da6..9e566be 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.exceptions.StateStoreClosedException;
 import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
 import 
org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
+import org.apache.bookkeeper.statelib.impl.Constants;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.LogRecordWithDLSN;
@@ -49,6 +50,7 @@ import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.util.Utils;
@@ -218,7 +220,10 @@ public abstract class 
AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
         } catch (IOException e) {
             return FutureUtils.exception(e);
         }
-        return logManager.openAsyncLogWriter().thenComposeAsync(w -> {
+        LedgerMetadata metadata = new LedgerMetadata();
+        
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+        metadata.setComponent("state-store");
+        return logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
             synchronized (this) {
                 writer = w;
                 nextRevision = writer.getLastTxId();
@@ -233,7 +238,10 @@ public abstract class 
AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
     }
 
     private CompletableFuture<DLSN> writeCatchUpMarker() {
-        return logManager.openAsyncLogWriter().thenComposeAsync(w -> {
+        LedgerMetadata metadata = new LedgerMetadata();
+        
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+        metadata.setComponent("state-store");
+        return logManager.openAsyncLogWriter(metadata).thenComposeAsync(w -> {
             synchronized (this) {
                 writer = w;
                 nextRevision = writer.getLastTxId();
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
index 8dc0447..0e2de49 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -32,11 +32,13 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.bookkeeper.statelib.impl.Constants;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogExistsException;
@@ -98,7 +100,10 @@ public class DLCheckpointStore implements CheckpointStore {
         try {
             DistributedLogManager dlm = namespace.openLog(
                 filePath);
-            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+            LedgerMetadata metadata = new LedgerMetadata();
+            
metadata.setApplication(Constants.LEDGER_METADATA_APPLICATION_STREAM_STORAGE);
+            metadata.setComponent("checkpoint-store");
+            AsyncLogWriter writer = 
Utils.ioResult(dlm.openAsyncLogWriter(metadata));
             return new BufferedOutputStream(
                 new DLOutputStream(dlm, writer), 128 * 1024);
         } catch (LogNotFoundException le) {

Reply via email to