This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b84178a  ISSUE #2218: Write customMetadata on ledgers created by 
DistributedLog
b84178a is described below

commit b84178af53d631b268143f9b8b0a3c3f731bf68a
Author: dmercuriali <[email protected]>
AuthorDate: Mon Feb 17 14:03:15 2020 +0100

    ISSUE #2218: Write customMetadata on ledgers created by DistributedLog
    
    ### Changes
    
    Enable DistributedLog to attach custom metadata to underlying ledgers when 
writing logs.
    
    The user can pass a LedgerMetadata object to 
DistributedLogManager#openLogWriter and 
DistributedLogManager#openAsyncLogWriter. The LedgerMetadata object is then 
used by the Allocator backing the BKLogWriteHandler every time it must allocate 
a new ledger.
    
    Master Issue: #2218
    
    
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #2254 from 
dmercuriali/2218-distributedlog-ledger-custom-metadata, closes #2218
---
 .../apache/distributedlog/BKAbstractLogWriter.java |  8 ++-
 .../distributedlog/BKDistributedLogManager.java    | 30 +++++++--
 .../apache/distributedlog/BookKeeperClient.java    |  6 +-
 .../distributedlog/api/DistributedLogManager.java  | 25 ++++++++
 .../apache/distributedlog/bk/LedgerMetadata.java   | 72 ++++++++++++++++++++++
 .../distributedlog/bk/SimpleLedgerAllocator.java   | 50 +++++++++++++--
 .../impl/logsegment/BKLogSegmentEntryStore.java    | 17 ++++-
 .../logsegment/LogSegmentEntryStore.java           | 15 +++++
 .../TestBKDistributedLogManager.java               | 53 ++++++++++++++++
 .../distributedlog/bk/TestLedgerAllocator.java     | 33 +++++++++-
 10 files changed, 292 insertions(+), 17 deletions(-)

diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
index 275c8d7..cc3cb96 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
@@ -94,6 +95,11 @@ abstract class BKAbstractLogWriter implements Closeable, 
AsyncCloseable, Abortab
 
     protected BKLogWriteHandler createAndCacheWriteHandler()
             throws IOException {
+        return createAndCacheWriteHandler(null);
+    }
+
+    protected BKLogWriteHandler createAndCacheWriteHandler(LedgerMetadata 
ledgerMetadata)
+            throws IOException {
         synchronized (this) {
             if (writeHandler != null) {
                 return writeHandler;
@@ -102,7 +108,7 @@ abstract class BKAbstractLogWriter implements Closeable, 
AsyncCloseable, Abortab
         // This code path will be executed when the handler is not set or has 
been closed
         // due to forceRecovery during testing
         BKLogWriteHandler newHandler =
-                
Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false));
+                
Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false, 
ledgerMetadata));
         boolean success = false;
         try {
             synchronized (this) {
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index 645d6d4..b21b210 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -43,6 +43,7 @@ import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
 import org.apache.distributedlog.common.util.PermitLimiter;
@@ -335,10 +336,16 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     public BKLogWriteHandler createWriteHandler(boolean lockHandler)
             throws IOException {
-        return Utils.ioResult(asyncCreateWriteHandler(lockHandler));
+        return createWriteHandler(lockHandler, null);
     }
 
-    CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean 
lockHandler) {
+    public BKLogWriteHandler createWriteHandler(boolean lockHandler, 
LedgerMetadata ledgerMetadata)
+            throws IOException {
+        return Utils.ioResult(asyncCreateWriteHandler(lockHandler, 
ledgerMetadata));
+    }
+
+    CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean 
lockHandler,
+                                                                 
LedgerMetadata ledgerMetadata) {
         // Fetching Log Metadata (create if not exists)
         return driver.getLogStreamMetadataStore(WRITER).getLog(
                 uri,
@@ -347,12 +354,13 @@ class BKDistributedLogManager implements 
DistributedLogManager {
                 conf.getCreateStreamIfNotExists()
         ).thenCompose(logMetadata -> {
             CompletableFuture<BKLogWriteHandler> createPromise = new 
CompletableFuture<BKLogWriteHandler>();
-            createWriteHandler(logMetadata, lockHandler, createPromise);
+            createWriteHandler(logMetadata, ledgerMetadata, lockHandler, 
createPromise);
             return createPromise;
         });
     }
 
     private void createWriteHandler(LogMetadataForWriter logMetadata,
+                                    LedgerMetadata ledgerMetadata,
                                     boolean lockHandler,
                                     final CompletableFuture<BKLogWriteHandler> 
createPromise) {
         // Build the locks
@@ -366,7 +374,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
         try {
             segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
-                    .newLogSegmentAllocator(logMetadata, dynConf);
+                    .newLogSegmentAllocator(logMetadata, dynConf, 
ledgerMetadata);
         } catch (IOException ioe) {
             FutureUtils.completeExceptionally(createPromise, ioe);
             return;
@@ -479,11 +487,16 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     @Override
     public BKSyncLogWriter openLogWriter() throws IOException {
+        return openLogWriter(null);
+    }
+
+    @Override
+    public BKSyncLogWriter openLogWriter(LedgerMetadata ledgerMetadata) throws 
IOException {
         checkClosedOrInError("startLogSegmentNonPartitioned");
         BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
         boolean success = false;
         try {
-            writer.createAndCacheWriteHandler();
+            writer.createAndCacheWriteHandler(ledgerMetadata);
             BKLogWriteHandler writeHandler = writer.getWriteHandler();
             Utils.ioResult(writeHandler.lockHandler());
             success = true;
@@ -507,6 +520,11 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     @Override
     public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
+        return openAsyncLogWriter(null);
+    }
+
+    @Override
+    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter(LedgerMetadata 
ledgerMetadata) {
         try {
             checkClosedOrInError("startLogSegmentNonPartitioned");
         } catch (AlreadyClosedException e) {
@@ -516,7 +534,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
         synchronized (this) {
             // 1. create the locked write handler
-            createWriteHandleFuture = asyncCreateWriteHandler(true);
+            createWriteHandleFuture = asyncCreateWriteHandler(true, 
ledgerMetadata);
         }
         return createWriteHandleFuture.thenCompose(writeHandler -> {
             final BKAsyncLogWriter writer;
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index 1f69633..a03530d 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.distributedlog.ZooKeeperClient.Credentials;
 import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.net.NetUtils;
@@ -203,7 +204,8 @@ public class BookKeeperClient {
     // Util functions
     public CompletableFuture<LedgerHandle> createLedger(int ensembleSize,
                                                         int writeQuorumSize,
-                                                        int ackQuorumSize) {
+                                                        int ackQuorumSize,
+                                                        LedgerMetadata 
ledgerMetadata) {
         BookKeeper bk;
         try {
             bk = get();
@@ -221,7 +223,7 @@ public class BookKeeperClient {
                             
promise.completeExceptionally(BKException.create(rc));
                         }
                     }
-                }, null, Collections.emptyMap());
+                }, null, ledgerMetadata == null ? Collections.emptyMap() : 
ledgerMetadata.getMetadata());
         return promise;
     }
 
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
index c5b9e03..236247a 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
@@ -23,12 +23,14 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.distributedlog.AppendOnlyStreamReader;
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.io.AsyncCloseable;
 import org.apache.distributedlog.namespace.NamespaceDriver;
@@ -105,6 +107,17 @@ public interface DistributedLogManager extends 
AsyncCloseable, Closeable {
     CompletableFuture<AsyncLogWriter> openAsyncLogWriter();
 
     /**
+     * Open async log writer to write records to the log stream.
+     * Provided metadata will be attached to the underlying BookKeeper ledgers.
+     *
+     * @param ledgerMetadata
+     * @return result represents the open result
+     */
+    default CompletableFuture<AsyncLogWriter> 
openAsyncLogWriter(LedgerMetadata ledgerMetadata) {
+        return FutureUtils.exception(new UnsupportedOperationException());
+    }
+
+    /**
      * Open sync log writer to write records to the log stream.
      *
      * @return sync log writer
@@ -113,6 +126,18 @@ public interface DistributedLogManager extends 
AsyncCloseable, Closeable {
     LogWriter openLogWriter() throws IOException;
 
     /**
+     * Open sync log writer to write records to the log stream.
+     * Provided metadata will be attached to the underlying BookKeeper ledgers.
+     *
+     * @param ledgerMetadata
+     * @return sync log writer
+     * @throws IOException when fails to open a sync log writer.
+     */
+    default LogWriter openLogWriter(LedgerMetadata ledgerMetadata) throws 
IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Begin writing to the log stream identified by the name.
      *
      * @return the writer interface to generate log records
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
new file mode 100644
index 0000000..201adfd
--- /dev/null
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2020 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.bk;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Ledger metadata.
+ */
+public final class LedgerMetadata {
+
+    private String application;
+    private String component;
+    private Map<String, String> customMetadata;
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public void setComponent(String component) {
+        this.component = component;
+    }
+
+    public void addCustomMetadata(String key, String value) {
+        if (key == null || "".equals(key.trim())) {
+            throw new IllegalArgumentException("Metadata key cant be empty");
+        }
+        if (value == null || "".equals(value.trim())) {
+            throw new IllegalArgumentException("Metadata value cant be empty");
+        }
+
+        if (customMetadata == null) {
+            customMetadata = new HashMap<>();
+        }
+
+        customMetadata.put(key, value);
+    }
+
+    public Map<String, byte[]> getMetadata() {
+        Map<String, byte[]> meta = new HashMap<>();
+        if (application != null) {
+            meta.put("application", 
application.getBytes(StandardCharsets.UTF_8));
+        }
+        if (component != null) {
+            meta.put("component", component.getBytes(StandardCharsets.UTF_8));
+        }
+
+        if (customMetadata != null) {
+            for (Map.Entry<String, String> e : customMetadata.entrySet()) {
+                String value = e.getValue();
+                meta.put(e.getKey(), value.getBytes(StandardCharsets.UTF_8));
+            }
+        }
+
+        return meta;
+    }
+}
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index 0115a6e..e866482 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -103,6 +103,8 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
     // Allocated Ledger
     LedgerHandle allocatedLh = null;
 
+    LedgerMetadata ledgerMetadata;
+
     CompletableFuture<Void> closeFuture = null;
     final LinkedList<CompletableFuture<Void>> ledgerDeletions =
             new LinkedList<CompletableFuture<Void>>();
@@ -159,14 +161,23 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
                                                    final QuorumConfigProvider 
quorumConfigProvider,
                                                    final ZooKeeperClient zkc,
                                                    final BookKeeperClient bkc) 
{
+        return SimpleLedgerAllocator.of(allocatePath, allocationData, 
quorumConfigProvider, zkc, bkc, null);
+    }
+
+    public static CompletableFuture<SimpleLedgerAllocator> of(final String 
allocatePath,
+                                                   final Versioned<byte[]> 
allocationData,
+                                                   final QuorumConfigProvider 
quorumConfigProvider,
+                                                   final ZooKeeperClient zkc,
+                                                   final BookKeeperClient bkc,
+                                                   final LedgerMetadata 
ledgerMetadata) {
         if (null != allocationData && null != allocationData.getValue()
                 && null != allocationData.getVersion()) {
             return FutureUtils.value(new SimpleLedgerAllocator(allocatePath, 
allocationData,
-                    quorumConfigProvider, zkc, bkc));
+                    quorumConfigProvider, zkc, bkc, ledgerMetadata));
         }
         return getAndCreateAllocationData(allocatePath, zkc)
             .thenApply(allocationData1 -> new 
SimpleLedgerAllocator(allocatePath, allocationData1,
-                        quorumConfigProvider, zkc, bkc));
+                        quorumConfigProvider, zkc, bkc, ledgerMetadata));
     }
 
     /**
@@ -188,10 +199,36 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
                                  QuorumConfigProvider quorumConfigProvider,
                                  ZooKeeperClient zkc,
                                  BookKeeperClient bkc) {
+        this(allocatePath, allocationData, quorumConfigProvider, zkc, bkc, 
null);
+    }
+
+    /**
+     * Construct a ledger allocator.
+     *
+     * @param allocatePath
+     *          znode path to store the allocated ledger.
+     * @param allocationData
+     *          allocation data.
+     * @param quorumConfigProvider
+     *          Quorum configuration provider.
+     * @param zkc
+     *          zookeeper client.
+     * @param bkc
+     *          bookkeeper client.
+     * @param ledgerMetadata
+     *          metadata to attach to allocated ledgers
+     */
+    public SimpleLedgerAllocator(String allocatePath,
+                                 Versioned<byte[]> allocationData,
+                                 QuorumConfigProvider quorumConfigProvider,
+                                 ZooKeeperClient zkc,
+                                 BookKeeperClient bkc,
+                                 LedgerMetadata ledgerMetadata) {
         this.zkc = zkc;
         this.bkc = bkc;
         this.allocatePath = allocatePath;
         this.quorumConfigProvider = quorumConfigProvider;
+        this.ledgerMetadata = ledgerMetadata;
         initialize(allocationData);
     }
 
@@ -231,7 +268,7 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
         }
         if (Phase.HANDED_OVER == phase) {
             // issue an allocate request when ledger is already handed over.
-            allocateLedger();
+            allocateLedger(ledgerMetadata);
         }
     }
 
@@ -318,6 +355,10 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
     }
 
     private synchronized void allocateLedger() {
+        allocateLedger(null);
+    }
+
+    private synchronized void allocateLedger(LedgerMetadata ledgerMetadata) {
         // make sure previous allocation is already handed over.
         if (Phase.HANDED_OVER != phase) {
             LOG.error("Trying allocate ledger for {} in phase {}, giving up.", 
allocatePath, phase);
@@ -329,7 +370,8 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
         bkc.createLedger(
                 quorumConfig.getEnsembleSize(),
                 quorumConfig.getWriteQuorumSize(),
-                quorumConfig.getAckQuorumSize()
+                quorumConfig.getAckQuorumSize(),
+                ledgerMetadata
         ).whenComplete(this);
     }
 
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index 66e9940..fc14df6 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -36,6 +36,7 @@ import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.bk.DynamicQuorumConfigProvider;
 import org.apache.distributedlog.bk.LedgerAllocator;
 import org.apache.distributedlog.bk.LedgerAllocatorDelegator;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.bk.QuorumConfigProvider;
 import org.apache.distributedlog.bk.SimpleLedgerAllocator;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -153,7 +154,8 @@ public class BKLogSegmentEntryStore implements
     //
 
     LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
-                                          DynamicDistributedLogConfiguration 
dynConf)
+                                          DynamicDistributedLogConfiguration 
dynConf,
+                                          LedgerMetadata ledgerMetadata)
             throws IOException {
         LedgerAllocator ledgerAllocatorDelegator;
         if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) {
@@ -164,7 +166,8 @@ public class BKLogSegmentEntryStore implements
                     logMetadata.getAllocationData(),
                     quorumConfigProvider,
                     zkc,
-                    bkc);
+                    bkc,
+                    ledgerMetadata);
             ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, 
true);
         } else {
             ledgerAllocatorDelegator = allocator;
@@ -176,8 +179,16 @@ public class BKLogSegmentEntryStore implements
     public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
             LogMetadataForWriter logMetadata,
             DynamicDistributedLogConfiguration dynConf) throws IOException {
+        return newLogSegmentAllocator(logMetadata, dynConf, null);
+    }
+
+    @Override
+    public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+            LogMetadataForWriter logMetadata,
+            DynamicDistributedLogConfiguration dynConf,
+            LedgerMetadata ledgerMetadata) throws IOException {
         // Build the ledger allocator
-        LedgerAllocator allocator = createLedgerAllocator(logMetadata, 
dynConf);
+        LedgerAllocator allocator = createLedgerAllocator(logMetadata, 
dynConf, ledgerMetadata);
         return new BKLogSegmentAllocator(allocator);
     }
 
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
index 4822b7e..f57a4b5 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.Beta;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.Allocator;
@@ -51,6 +52,20 @@ public interface LogSegmentEntryStore {
             DynamicDistributedLogConfiguration dynConf) throws IOException;
 
     /**
+     * Create a new log segment allocator for allocating log segment entry 
writers.
+     *
+     * @param metadata the metadata for the log stream
+     * @param ledgerMetadata metadata to be attached to underlying ledgers
+     * @return future represent the log segment allocator
+     */
+    default Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+            LogMetadataForWriter metadata,
+            DynamicDistributedLogConfiguration dynConf,
+            LedgerMetadata ledgerMetadata) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Open the reader for reading data to the log <i>segment</i>.
      *
      * @param segment the log <i>segment</i> to read data from
diff --git 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index db06891..3aada68 100644
--- 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -38,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.api.AsyncLogWriter;
@@ -47,6 +49,7 @@ import org.apache.distributedlog.api.LogWriter;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import 
org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.BKTransmitException;
@@ -1247,4 +1250,54 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
             fail("Delete log twice should not throw any exception");
         }
     }
+
+    @Test(timeout = 60000)
+    public void testSyncLogWithLedgerMetadata() throws Exception {
+
+        String application = "myapplication";
+        String component = "mycomponent";
+        String custom = "mycustommetadata";
+        LedgerMetadata ledgerMetadata = new LedgerMetadata();
+        ledgerMetadata.setApplication(application);
+        ledgerMetadata.setComponent(component);
+        ledgerMetadata.addCustomMetadata("custom", custom);
+
+        BKDistributedLogManager dlm = createNewDLM(conf, 
"distrlog-writemetadata");
+
+        BKSyncLogWriter sync = dlm.openLogWriter(ledgerMetadata);
+        sync.write(DLMTestUtil.getLogRecordInstance(1));
+
+        LedgerHandle lh = getLedgerHandle(sync.getCachedLogWriter());
+        Map<String, byte[]> customMeta = lh.getCustomMetadata();
+        assertEquals(application, new String(customMeta.get("application"), 
UTF_8));
+        assertEquals(component, new String(customMeta.get("component"), 
UTF_8));
+        assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+
+        sync.closeAndComplete();
+    }
+
+    @Test(timeout = 60000)
+    public void testAsyncLogWithLedgerMetadata() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setWriteLockEnabled(false);
+
+        BKDistributedLogManager dlm = createNewDLM(confLocal, 
"distrlog-writemetadata");
+
+        String application = "myapplication";
+        String custom = "mycustommetadata";
+        LedgerMetadata ledgerMetadata = new LedgerMetadata();
+        ledgerMetadata.setApplication(application);
+        ledgerMetadata.addCustomMetadata("custom", custom);
+
+        AsyncLogWriter async = 
Utils.ioResult(dlm.openAsyncLogWriter(ledgerMetadata));
+        Utils.ioResult(async.write(DLMTestUtil.getLogRecordInstance(2)));
+
+        LedgerHandle lh = getLedgerHandle(((BKAsyncLogWriter) 
async).getCachedLogWriter());
+        Map<String, byte[]> customMeta = lh.getCustomMetadata();
+        assertEquals(application, new String(customMeta.get("application"), 
UTF_8));
+        assertNull(customMeta.get("component"));
+        assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+    }
 }
diff --git 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 011562e..50cde80 100644
--- 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.net.URI;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BKException;
@@ -121,7 +122,14 @@ public class TestLedgerAllocator extends 
TestDistributedLogBase {
 
     private SimpleLedgerAllocator createAllocator(String allocationPath,
                                                   DistributedLogConfiguration 
conf) throws Exception {
-        return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null, 
newQuorumConfigProvider(conf), zkc, bkc));
+        return createAllocator(allocationPath, conf, null);
+    }
+
+    private SimpleLedgerAllocator createAllocator(String allocationPath,
+                                                  DistributedLogConfiguration 
conf,
+                                                  LedgerMetadata 
ledgerMetadata) throws Exception {
+        return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null,
+                newQuorumConfigProvider(conf), zkc, bkc, ledgerMetadata));
     }
 
     @FlakyTest("https://issues.apache.org/jira/browse/DL-43";)
@@ -378,4 +386,27 @@ public class TestLedgerAllocator extends 
TestDistributedLogBase {
         }
         assertEquals(numLedgers, allocatedLedgers.size());
     }
+
+    @Test(timeout = 60000)
+    public void testAllocationWithMetadata() throws Exception {
+        String allocationPath = "/" + runtime.getMethodName();
+
+        String application = "testApplicationMetadata";
+        String component = "testComponentMetadata";
+        String custom = "customMetadata";
+        LedgerMetadata ledgerMetadata = new LedgerMetadata();
+        ledgerMetadata.setApplication(application);
+        ledgerMetadata.setComponent(component);
+        ledgerMetadata.addCustomMetadata("custom", custom);
+
+        SimpleLedgerAllocator allocator = createAllocator(allocationPath, 
dlConf, ledgerMetadata);
+        allocator.allocate();
+
+        ZKTransaction txn = newTxn();
+        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, 
NULL_LISTENER));
+        Map<String, byte[]> customMeta = lh.getCustomMetadata();
+        assertEquals(application, new String(customMeta.get("application"), 
UTF_8));
+        assertEquals(component, new String(customMeta.get("component"), 
UTF_8));
+        assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+    }
 }

Reply via email to