This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b84178a ISSUE #2218: Write customMetadata on ledgers created by
DistributedLog
b84178a is described below
commit b84178af53d631b268143f9b8b0a3c3f731bf68a
Author: dmercuriali <[email protected]>
AuthorDate: Mon Feb 17 14:03:15 2020 +0100
ISSUE #2218: Write customMetadata on ledgers created by DistributedLog
### Changes
Enable DistributedLog to attach custom metadata to underlying ledgers when
writing logs.
The user can pass a LedgerMetadata object to
DistributedLogManager#openLogWriter and
DistributedLogManager#openAsyncLogWriter. The LedgerMetadata object is then
used by the Allocator backing the BKLogWriteHandler every time it must allocate
a new ledger.
Master Issue: #2218
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #2254 from
dmercuriali/2218-distributedlog-ledger-custom-metadata, closes #2218
---
.../apache/distributedlog/BKAbstractLogWriter.java | 8 ++-
.../distributedlog/BKDistributedLogManager.java | 30 +++++++--
.../apache/distributedlog/BookKeeperClient.java | 6 +-
.../distributedlog/api/DistributedLogManager.java | 25 ++++++++
.../apache/distributedlog/bk/LedgerMetadata.java | 72 ++++++++++++++++++++++
.../distributedlog/bk/SimpleLedgerAllocator.java | 50 +++++++++++++--
.../impl/logsegment/BKLogSegmentEntryStore.java | 17 ++++-
.../logsegment/LogSegmentEntryStore.java | 15 +++++
.../TestBKDistributedLogManager.java | 53 ++++++++++++++++
.../distributedlog/bk/TestLedgerAllocator.java | 33 +++++++++-
10 files changed, 292 insertions(+), 17 deletions(-)
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
index 275c8d7..cc3cb96 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
@@ -94,6 +95,11 @@ abstract class BKAbstractLogWriter implements Closeable,
AsyncCloseable, Abortab
protected BKLogWriteHandler createAndCacheWriteHandler()
throws IOException {
+ return createAndCacheWriteHandler(null);
+ }
+
+ protected BKLogWriteHandler createAndCacheWriteHandler(LedgerMetadata
ledgerMetadata)
+ throws IOException {
synchronized (this) {
if (writeHandler != null) {
return writeHandler;
@@ -102,7 +108,7 @@ abstract class BKAbstractLogWriter implements Closeable,
AsyncCloseable, Abortab
// This code path will be executed when the handler is not set or has
been closed
// due to forceRecovery during testing
BKLogWriteHandler newHandler =
-
Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false));
+
Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false,
ledgerMetadata));
boolean success = false;
try {
synchronized (this) {
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index 645d6d4..b21b210 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -43,6 +43,7 @@ import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
import org.apache.distributedlog.common.util.PermitLimiter;
@@ -335,10 +336,16 @@ class BKDistributedLogManager implements
DistributedLogManager {
public BKLogWriteHandler createWriteHandler(boolean lockHandler)
throws IOException {
- return Utils.ioResult(asyncCreateWriteHandler(lockHandler));
+ return createWriteHandler(lockHandler, null);
}
- CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean
lockHandler) {
+ public BKLogWriteHandler createWriteHandler(boolean lockHandler,
LedgerMetadata ledgerMetadata)
+ throws IOException {
+ return Utils.ioResult(asyncCreateWriteHandler(lockHandler,
ledgerMetadata));
+ }
+
+ CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean
lockHandler,
+
LedgerMetadata ledgerMetadata) {
// Fetching Log Metadata (create if not exists)
return driver.getLogStreamMetadataStore(WRITER).getLog(
uri,
@@ -347,12 +354,13 @@ class BKDistributedLogManager implements
DistributedLogManager {
conf.getCreateStreamIfNotExists()
).thenCompose(logMetadata -> {
CompletableFuture<BKLogWriteHandler> createPromise = new
CompletableFuture<BKLogWriteHandler>();
- createWriteHandler(logMetadata, lockHandler, createPromise);
+ createWriteHandler(logMetadata, ledgerMetadata, lockHandler,
createPromise);
return createPromise;
});
}
private void createWriteHandler(LogMetadataForWriter logMetadata,
+ LedgerMetadata ledgerMetadata,
boolean lockHandler,
final CompletableFuture<BKLogWriteHandler>
createPromise) {
// Build the locks
@@ -366,7 +374,7 @@ class BKDistributedLogManager implements
DistributedLogManager {
Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
try {
segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
- .newLogSegmentAllocator(logMetadata, dynConf);
+ .newLogSegmentAllocator(logMetadata, dynConf,
ledgerMetadata);
} catch (IOException ioe) {
FutureUtils.completeExceptionally(createPromise, ioe);
return;
@@ -479,11 +487,16 @@ class BKDistributedLogManager implements
DistributedLogManager {
@Override
public BKSyncLogWriter openLogWriter() throws IOException {
+ return openLogWriter(null);
+ }
+
+ @Override
+ public BKSyncLogWriter openLogWriter(LedgerMetadata ledgerMetadata) throws
IOException {
checkClosedOrInError("startLogSegmentNonPartitioned");
BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
boolean success = false;
try {
- writer.createAndCacheWriteHandler();
+ writer.createAndCacheWriteHandler(ledgerMetadata);
BKLogWriteHandler writeHandler = writer.getWriteHandler();
Utils.ioResult(writeHandler.lockHandler());
success = true;
@@ -507,6 +520,11 @@ class BKDistributedLogManager implements
DistributedLogManager {
@Override
public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
+ return openAsyncLogWriter(null);
+ }
+
+ @Override
+ public CompletableFuture<AsyncLogWriter> openAsyncLogWriter(LedgerMetadata
ledgerMetadata) {
try {
checkClosedOrInError("startLogSegmentNonPartitioned");
} catch (AlreadyClosedException e) {
@@ -516,7 +534,7 @@ class BKDistributedLogManager implements
DistributedLogManager {
CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
synchronized (this) {
// 1. create the locked write handler
- createWriteHandleFuture = asyncCreateWriteHandler(true);
+ createWriteHandleFuture = asyncCreateWriteHandler(true,
ledgerMetadata);
}
return createWriteHandleFuture.thenCompose(writeHandler -> {
final BKAsyncLogWriter writer;
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index 1f69633..a03530d 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.distributedlog.ZooKeeperClient.Credentials;
import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.net.NetUtils;
@@ -203,7 +204,8 @@ public class BookKeeperClient {
// Util functions
public CompletableFuture<LedgerHandle> createLedger(int ensembleSize,
int writeQuorumSize,
- int ackQuorumSize) {
+ int ackQuorumSize,
+ LedgerMetadata
ledgerMetadata) {
BookKeeper bk;
try {
bk = get();
@@ -221,7 +223,7 @@ public class BookKeeperClient {
promise.completeExceptionally(BKException.create(rc));
}
}
- }, null, Collections.emptyMap());
+ }, null, ledgerMetadata == null ? Collections.emptyMap() :
ledgerMetadata.getMetadata());
return promise;
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
index c5b9e03..236247a 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
@@ -23,12 +23,14 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.distributedlog.AppendOnlyStreamReader;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.namespace.NamespaceDriver;
@@ -105,6 +107,17 @@ public interface DistributedLogManager extends
AsyncCloseable, Closeable {
CompletableFuture<AsyncLogWriter> openAsyncLogWriter();
/**
+ * Open async log writer to write records to the log stream.
+ * Provided metadata will be attached to the underlying BookKeeper ledgers.
+ *
+ * @param ledgerMetadata
+ * @return result represents the open result
+ */
+ default CompletableFuture<AsyncLogWriter>
openAsyncLogWriter(LedgerMetadata ledgerMetadata) {
+ return FutureUtils.exception(new UnsupportedOperationException());
+ }
+
+ /**
* Open sync log writer to write records to the log stream.
*
* @return sync log writer
@@ -113,6 +126,18 @@ public interface DistributedLogManager extends
AsyncCloseable, Closeable {
LogWriter openLogWriter() throws IOException;
/**
+ * Open sync log writer to write records to the log stream.
+ * Provided metadata will be attached to the underlying BookKeeper ledgers.
+ *
+ * @param ledgerMetadata
+ * @return sync log writer
+ * @throws IOException when fails to open a sync log writer.
+ */
+ default LogWriter openLogWriter(LedgerMetadata ledgerMetadata) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Begin writing to the log stream identified by the name.
*
* @return the writer interface to generate log records
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
new file mode 100644
index 0000000..201adfd
--- /dev/null
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/LedgerMetadata.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2020 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.bk;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Ledger metadata.
+ */
+public final class LedgerMetadata {
+
+ private String application;
+ private String component;
+ private Map<String, String> customMetadata;
+
+ public void setApplication(String application) {
+ this.application = application;
+ }
+
+ public void setComponent(String component) {
+ this.component = component;
+ }
+
+ public void addCustomMetadata(String key, String value) {
+ if (key == null || "".equals(key.trim())) {
+ throw new IllegalArgumentException("Metadata key cant be empty");
+ }
+ if (value == null || "".equals(value.trim())) {
+ throw new IllegalArgumentException("Metadata value cant be empty");
+ }
+
+ if (customMetadata == null) {
+ customMetadata = new HashMap<>();
+ }
+
+ customMetadata.put(key, value);
+ }
+
+ public Map<String, byte[]> getMetadata() {
+ Map<String, byte[]> meta = new HashMap<>();
+ if (application != null) {
+ meta.put("application",
application.getBytes(StandardCharsets.UTF_8));
+ }
+ if (component != null) {
+ meta.put("component", component.getBytes(StandardCharsets.UTF_8));
+ }
+
+ if (customMetadata != null) {
+ for (Map.Entry<String, String> e : customMetadata.entrySet()) {
+ String value = e.getValue();
+ meta.put(e.getKey(), value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ return meta;
+ }
+}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index 0115a6e..e866482 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -103,6 +103,8 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
// Allocated Ledger
LedgerHandle allocatedLh = null;
+ LedgerMetadata ledgerMetadata;
+
CompletableFuture<Void> closeFuture = null;
final LinkedList<CompletableFuture<Void>> ledgerDeletions =
new LinkedList<CompletableFuture<Void>>();
@@ -159,14 +161,23 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
final QuorumConfigProvider
quorumConfigProvider,
final ZooKeeperClient zkc,
final BookKeeperClient bkc)
{
+ return SimpleLedgerAllocator.of(allocatePath, allocationData,
quorumConfigProvider, zkc, bkc, null);
+ }
+
+ public static CompletableFuture<SimpleLedgerAllocator> of(final String
allocatePath,
+ final Versioned<byte[]>
allocationData,
+ final QuorumConfigProvider
quorumConfigProvider,
+ final ZooKeeperClient zkc,
+ final BookKeeperClient bkc,
+ final LedgerMetadata
ledgerMetadata) {
if (null != allocationData && null != allocationData.getValue()
&& null != allocationData.getVersion()) {
return FutureUtils.value(new SimpleLedgerAllocator(allocatePath,
allocationData,
- quorumConfigProvider, zkc, bkc));
+ quorumConfigProvider, zkc, bkc, ledgerMetadata));
}
return getAndCreateAllocationData(allocatePath, zkc)
.thenApply(allocationData1 -> new
SimpleLedgerAllocator(allocatePath, allocationData1,
- quorumConfigProvider, zkc, bkc));
+ quorumConfigProvider, zkc, bkc, ledgerMetadata));
}
/**
@@ -188,10 +199,36 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
QuorumConfigProvider quorumConfigProvider,
ZooKeeperClient zkc,
BookKeeperClient bkc) {
+ this(allocatePath, allocationData, quorumConfigProvider, zkc, bkc,
null);
+ }
+
+ /**
+ * Construct a ledger allocator.
+ *
+ * @param allocatePath
+ * znode path to store the allocated ledger.
+ * @param allocationData
+ * allocation data.
+ * @param quorumConfigProvider
+ * Quorum configuration provider.
+ * @param zkc
+ * zookeeper client.
+ * @param bkc
+ * bookkeeper client.
+ * @param ledgerMetadata
+ * metadata to attach to allocated ledgers
+ */
+ public SimpleLedgerAllocator(String allocatePath,
+ Versioned<byte[]> allocationData,
+ QuorumConfigProvider quorumConfigProvider,
+ ZooKeeperClient zkc,
+ BookKeeperClient bkc,
+ LedgerMetadata ledgerMetadata) {
this.zkc = zkc;
this.bkc = bkc;
this.allocatePath = allocatePath;
this.quorumConfigProvider = quorumConfigProvider;
+ this.ledgerMetadata = ledgerMetadata;
initialize(allocationData);
}
@@ -231,7 +268,7 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
}
if (Phase.HANDED_OVER == phase) {
// issue an allocate request when ledger is already handed over.
- allocateLedger();
+ allocateLedger(ledgerMetadata);
}
}
@@ -318,6 +355,10 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
}
private synchronized void allocateLedger() {
+ allocateLedger(null);
+ }
+
+ private synchronized void allocateLedger(LedgerMetadata ledgerMetadata) {
// make sure previous allocation is already handed over.
if (Phase.HANDED_OVER != phase) {
LOG.error("Trying allocate ledger for {} in phase {}, giving up.",
allocatePath, phase);
@@ -329,7 +370,8 @@ public class SimpleLedgerAllocator implements
LedgerAllocator, FutureEventListen
bkc.createLedger(
quorumConfig.getEnsembleSize(),
quorumConfig.getWriteQuorumSize(),
- quorumConfig.getAckQuorumSize()
+ quorumConfig.getAckQuorumSize(),
+ ledgerMetadata
).whenComplete(this);
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index 66e9940..fc14df6 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -36,6 +36,7 @@ import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.bk.DynamicQuorumConfigProvider;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.bk.LedgerAllocatorDelegator;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.bk.QuorumConfigProvider;
import org.apache.distributedlog.bk.SimpleLedgerAllocator;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -153,7 +154,8 @@ public class BKLogSegmentEntryStore implements
//
LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
- DynamicDistributedLogConfiguration
dynConf)
+ DynamicDistributedLogConfiguration
dynConf,
+ LedgerMetadata ledgerMetadata)
throws IOException {
LedgerAllocator ledgerAllocatorDelegator;
if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) {
@@ -164,7 +166,8 @@ public class BKLogSegmentEntryStore implements
logMetadata.getAllocationData(),
quorumConfigProvider,
zkc,
- bkc);
+ bkc,
+ ledgerMetadata);
ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator,
true);
} else {
ledgerAllocatorDelegator = allocator;
@@ -176,8 +179,16 @@ public class BKLogSegmentEntryStore implements
public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
LogMetadataForWriter logMetadata,
DynamicDistributedLogConfiguration dynConf) throws IOException {
+ return newLogSegmentAllocator(logMetadata, dynConf, null);
+ }
+
+ @Override
+ public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+ LogMetadataForWriter logMetadata,
+ DynamicDistributedLogConfiguration dynConf,
+ LedgerMetadata ledgerMetadata) throws IOException {
// Build the ledger allocator
- LedgerAllocator allocator = createLedgerAllocator(logMetadata,
dynConf);
+ LedgerAllocator allocator = createLedgerAllocator(logMetadata,
dynConf, ledgerMetadata);
return new BKLogSegmentAllocator(allocator);
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
index 4822b7e..f57a4b5 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.Beta;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.Allocator;
@@ -51,6 +52,20 @@ public interface LogSegmentEntryStore {
DynamicDistributedLogConfiguration dynConf) throws IOException;
/**
+ * Create a new log segment allocator for allocating log segment entry
writers.
+ *
+ * @param metadata the metadata for the log stream
+ * @param ledgerMetadata metadata to be attached to underlying ledgers
+ * @return future represent the log segment allocator
+ */
+ default Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+ LogMetadataForWriter metadata,
+ DynamicDistributedLogConfiguration dynConf,
+ LedgerMetadata ledgerMetadata) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Open the reader for reading data to the log <i>segment</i>.
*
* @param segment the log <i>segment</i> to read data from
diff --git
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index db06891..3aada68 100644
---
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -38,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
@@ -47,6 +49,7 @@ import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import
org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.BKTransmitException;
@@ -1247,4 +1250,54 @@ public class TestBKDistributedLogManager extends
TestDistributedLogBase {
fail("Delete log twice should not throw any exception");
}
}
+
+ @Test(timeout = 60000)
+ public void testSyncLogWithLedgerMetadata() throws Exception {
+
+ String application = "myapplication";
+ String component = "mycomponent";
+ String custom = "mycustommetadata";
+ LedgerMetadata ledgerMetadata = new LedgerMetadata();
+ ledgerMetadata.setApplication(application);
+ ledgerMetadata.setComponent(component);
+ ledgerMetadata.addCustomMetadata("custom", custom);
+
+ BKDistributedLogManager dlm = createNewDLM(conf,
"distrlog-writemetadata");
+
+ BKSyncLogWriter sync = dlm.openLogWriter(ledgerMetadata);
+ sync.write(DLMTestUtil.getLogRecordInstance(1));
+
+ LedgerHandle lh = getLedgerHandle(sync.getCachedLogWriter());
+ Map<String, byte[]> customMeta = lh.getCustomMetadata();
+ assertEquals(application, new String(customMeta.get("application"),
UTF_8));
+ assertEquals(component, new String(customMeta.get("component"),
UTF_8));
+ assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+
+ sync.closeAndComplete();
+ }
+
+ @Test(timeout = 60000)
+ public void testAsyncLogWithLedgerMetadata() throws Exception {
+ DistributedLogConfiguration confLocal = new
DistributedLogConfiguration();
+ confLocal.addConfiguration(conf);
+ confLocal.setOutputBufferSize(0);
+ confLocal.setWriteLockEnabled(false);
+
+ BKDistributedLogManager dlm = createNewDLM(confLocal,
"distrlog-writemetadata");
+
+ String application = "myapplication";
+ String custom = "mycustommetadata";
+ LedgerMetadata ledgerMetadata = new LedgerMetadata();
+ ledgerMetadata.setApplication(application);
+ ledgerMetadata.addCustomMetadata("custom", custom);
+
+ AsyncLogWriter async =
Utils.ioResult(dlm.openAsyncLogWriter(ledgerMetadata));
+ Utils.ioResult(async.write(DLMTestUtil.getLogRecordInstance(2)));
+
+ LedgerHandle lh = getLedgerHandle(((BKAsyncLogWriter)
async).getCachedLogWriter());
+ Map<String, byte[]> customMeta = lh.getCustomMetadata();
+ assertEquals(application, new String(customMeta.get("application"),
UTF_8));
+ assertNull(customMeta.get("component"));
+ assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ }
}
diff --git
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 011562e..50cde80 100644
---
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException;
@@ -121,7 +122,14 @@ public class TestLedgerAllocator extends
TestDistributedLogBase {
private SimpleLedgerAllocator createAllocator(String allocationPath,
DistributedLogConfiguration
conf) throws Exception {
- return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null,
newQuorumConfigProvider(conf), zkc, bkc));
+ return createAllocator(allocationPath, conf, null);
+ }
+
+ private SimpleLedgerAllocator createAllocator(String allocationPath,
+ DistributedLogConfiguration
conf,
+ LedgerMetadata
ledgerMetadata) throws Exception {
+ return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null,
+ newQuorumConfigProvider(conf), zkc, bkc, ledgerMetadata));
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-43")
@@ -378,4 +386,27 @@ public class TestLedgerAllocator extends
TestDistributedLogBase {
}
assertEquals(numLedgers, allocatedLedgers.size());
}
+
+ @Test(timeout = 60000)
+ public void testAllocationWithMetadata() throws Exception {
+ String allocationPath = "/" + runtime.getMethodName();
+
+ String application = "testApplicationMetadata";
+ String component = "testComponentMetadata";
+ String custom = "customMetadata";
+ LedgerMetadata ledgerMetadata = new LedgerMetadata();
+ ledgerMetadata.setApplication(application);
+ ledgerMetadata.setComponent(component);
+ ledgerMetadata.addCustomMetadata("custom", custom);
+
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath,
dlConf, ledgerMetadata);
+ allocator.allocate();
+
+ ZKTransaction txn = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn,
NULL_LISTENER));
+ Map<String, byte[]> customMeta = lh.getCustomMetadata();
+ assertEquals(application, new String(customMeta.get("application"),
UTF_8));
+ assertEquals(component, new String(customMeta.get("component"),
UTF_8));
+ assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ }
}