This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4338052 [Issue 3458: Tag Pulsar ledgers in order to distinguish from
other leggers in the same Bookkeeper cluster (#3525)
4338052 is described below
commit 43380523c5269c152f61b2aa8f7b70281c770d1d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Feb 13 15:52:11 2019 +0100
[Issue 3458: Tag Pulsar ledgers in order to distinguish from other leggers
in the same Bookkeeper cluster (#3525)
Fixes #3458
### Motivation
See #3458
### Modifications
Add a new LedgerMetadataUtils class which holds the logic for building
"metadata" to be attached to
### Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
---
.../mledger/impl/LedgerMetadataUtils.java | 104 +++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 ++--
.../service/schema/BookkeeperSchemaStorage.java | 17 ++--
.../pulsar/compaction/TwoPhaseCompactor.java | 8 +-
site2/docs/cookbooks-bookkeepermetadata.md | 20 ++++
site2/website/sidebars.json | 3 +-
7 files changed, 157 insertions(+), 20 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
new file mode 100644
index 0000000..3a245d1
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Utilities for managing BookKeeper Ledgers custom metadata.
+ */
+public final class LedgerMetadataUtils {
+
+ private static final String METADATA_PROPERTY_APPLICATION = "application";
+ private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR
+ = "pulsar".getBytes(StandardCharsets.UTF_8);
+
+ private static final String METADATA_PROPERTY_COMPONENT = "component";
+ private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER
+ = "managed-ledger".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER
+ = "compacted-ledger".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA
+ = "schema".getBytes(StandardCharsets.UTF_8);
+
+ private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME =
"pulsar/managed-ledger";
+ private static final String METADATA_PROPERTY_CURSOR_NAME =
"pulsar/cursor";
+ private static final String METADATA_PROPERTY_COMPACTEDTOPIC =
"pulsar/compactedTopic";
+ private static final String METADATA_PROPERTY_COMPACTEDTO =
"pulsar/compactedTo";
+ private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
+
+ /**
+ * Build base metadata for every ManagedLedger.
+ *
+ * @param name the name of the ledger
+ * @return an immutable map which describes a ManagedLedger
+ */
+ static Map<String, byte[]> buildBaseManagedLedgerMetadata(String name) {
+ return ImmutableMap.of(
+ METADATA_PROPERTY_APPLICATION,
METADATA_PROPERTY_APPLICATION_PULSAR,
+ METADATA_PROPERTY_COMPONENT,
METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER,
+ METADATA_PROPERTY_MANAGED_LEDGER_NAME,
name.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Build additional metadata for a Cursor.
+ *
+ * @param name the name of the cursor
+ * @return an immutable map which describes the cursor
+ * @see #buildBaseManagedLedgerMetadata(java.lang.String)
+ */
+ static Map<String, byte[]> buildAdditionalMetadataForCursor(String name) {
+ return ImmutableMap.of(METADATA_PROPERTY_CURSOR_NAME,
name.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Build additional metadata for a CompactedLedger.
+ *
+ * @param compactedTopic reference to the compacted topic.
+ * @param compactedToMessageId last mesasgeId.
+ * @return an immutable map which describes the compacted ledger
+ */
+ public static Map<String, byte[]> buildMetadataForCompactedLedger(String
compactedTopic, byte[] compactedToMessageId) {
+ return ImmutableMap.of(
+ METADATA_PROPERTY_APPLICATION,
METADATA_PROPERTY_APPLICATION_PULSAR,
+ METADATA_PROPERTY_COMPONENT,
METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
+ METADATA_PROPERTY_COMPACTEDTOPIC,
compactedTopic.getBytes(StandardCharsets.UTF_8),
+ METADATA_PROPERTY_COMPACTEDTO, compactedToMessageId
+ );
+ }
+
+ /**
+ * Build additional metadata for a Schema
+ *
+ * @param schemaId id of the schema
+ * @return an immutable map which describes the schema
+ */
+ public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
+ return ImmutableMap.of(
+ METADATA_PROPERTY_APPLICATION,
METADATA_PROPERTY_APPLICATION_PULSAR,
+ METADATA_PROPERTY_COMPONENT,
METADATA_PROPERTY_COMPONENT_SCHEMA,
+ METADATA_PROPERTY_SCHEMAID,
schemaId.getBytes(StandardCharsets.UTF_8)
+ );
+ }
+
+ private LedgerMetadataUtils() {}
+
+}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 01b3a9c..1d5fe67 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -29,6 +29,7 @@ import static
org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
@@ -37,6 +38,7 @@ import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayDeque;
@@ -2015,7 +2017,6 @@ public class ManagedCursorImpl implements ManagedCursor {
void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
-
ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx)
-> {
if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -2081,7 +2082,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
});
}));
- }, Collections.emptyMap());
+ }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 26b3acc..e1376f9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.time.Clock;
@@ -106,7 +105,6 @@ import
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -119,6 +117,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
@@ -131,6 +131,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
protected final BookKeeper bookKeeper;
protected final String name;
+ private final Map<String, byte[]> ledgerMetadata;
private final BookKeeper.DigestType digestType;
protected ManagedLedgerConfig config;
@@ -249,6 +250,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.config = config;
this.store = store;
this.name = name;
+ this.ledgerMetadata =
LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType =
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = orderedExecutor;
@@ -439,7 +441,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(),
ledgersStat, storeLedgersCb);
}));
- }, Collections.emptyMap());
+ }, ledgerMetadata);
}
private void initializeCursors(final ManagedLedgerInitializeLedgerCallback
callback) {
@@ -3013,13 +3015,19 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
* @param config
* @param digestType
* @param cb
- * @param emptyMap
+ * @param metadata
*/
protected void asyncCreateLedger(BookKeeper bookKeeper,
ManagedLedgerConfig config, DigestType digestType,
- CreateCallback cb, Map<Object, Object> emptyMap) {
+ CreateCallback cb, Map<String, byte[]> metadata) {
AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+ Map<String, byte[]> finalMetadata = new HashMap<>();
+ finalMetadata.putAll(ledgerMetadata);
+ finalMetadata.putAll(metadata);
+ if (log.isDebugEnabled()) {
+ log.debug("creating ledger, metadata: "+finalMetadata);
+ }
bookKeeper.asyncCreateLedger(config.getEnsembleSize(),
config.getWriteQuorumSize(), config.getAckQuorumSize(),
- digestType, config.getPassword(), cb, ledgerCreated,
Collections.emptyMap());
+ digestType, config.getPassword(), cb, ledgerCreated,
finalMetadata);
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
ledgerCreated.set(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index f0e9699..47fcb62 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -27,13 +27,13 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -210,7 +211,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
if (optLocatorEntry.isPresent()) {
// Schema locator was already present
- return
addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(), data)
+ return addNewSchemaEntryToStore(schemaId,
optLocatorEntry.get().locator.getIndexList(), data)
.thenCompose(position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash));
} else {
// No schema was defined yet
@@ -259,7 +260,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return findSchemaEntryByHash(locator.getIndexList(),
hash).thenCompose(version -> {
if (isNull(version)) {
- return
addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
+ return addNewSchemaEntryToStore(schemaId,
locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash));
} else {
return completedFuture(version);
@@ -303,7 +304,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
.setLedgerId(-1L)
).build();
- return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex),
data).thenCompose(position -> {
+ return addNewSchemaEntryToStore(schemaId,
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
// The schema was stored in the ledger, now update the z-node with
the pointer to it
SchemaStorageFormat.IndexEntry info =
SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(0)
@@ -338,11 +339,12 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
@NotNull
private CompletableFuture<SchemaStorageFormat.PositionInfo>
addNewSchemaEntryToStore(
+ String schemaId,
List<SchemaStorageFormat.IndexEntry> index,
byte[] data
) {
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index,
data);
- return createLedger().thenCompose(ledgerHandle ->
+ return createLedger(schemaId).thenCompose(ledgerHandle ->
addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
Functions.newPositionInfo(ledgerHandle.getId(), entryId)
)
@@ -497,7 +499,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NotNull
- private CompletableFuture<LedgerHandle> createLedger() {
+ private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
+ Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForSchema(schemaId);
final CompletableFuture<LedgerHandle> future = new
CompletableFuture<>();
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
@@ -511,7 +514,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
} else {
future.complete(handle);
}
- }, null, Collections.emptyMap()
+ }, null, metadata
);
return future;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index d7e9bb1..b070f3b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
@@ -129,7 +130,7 @@ public class TwoPhaseCompactor extends Compactor {
Pair<String,Integer> keyAndSize =
extractKeyAndSize(m);
if (keyAndSize != null) {
if(keyAndSize.getRight() > 0) {
- latestForKey.put(keyAndSize.getLeft(),
id);
+ latestForKey.put(keyAndSize.getLeft(), id);
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
@@ -165,8 +166,7 @@ public class TwoPhaseCompactor extends Compactor {
private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey, BookKeeper bk) {
- Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic",
reader.getTopic().getBytes(UTF_8),
- "compactedTo", to.toByteArray());
+ Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(),
to.toByteArray());
return createLedger(bk, metadata).thenCompose((ledger) -> {
log.info("Commencing phase two of compaction for {}, from {} to
{}, compacting {} keys to ledger {}",
reader.getTopic(), from, to, latestForKey.size(),
ledger.getId());
@@ -228,7 +228,7 @@ public class TwoPhaseCompactor extends Compactor {
MessageId msg;
if (keyAndSize == null) { // pass through messages
without a key
messageToAdd = Optional.of(m);
- } else if ((msg =
latestForKey.get(keyAndSize.getLeft())) != null
+ } else if ((msg =
latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only
if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new
IllegalArgumentException(
diff --git a/site2/docs/cookbooks-bookkeepermetadata.md
b/site2/docs/cookbooks-bookkeepermetadata.md
new file mode 100644
index 0000000..187cb65
--- /dev/null
+++ b/site2/docs/cookbooks-bookkeepermetadata.md
@@ -0,0 +1,20 @@
+---
+id: cookbooks-bookkeepermetadata
+title: BookKeeper Ledger Metadata
+---
+
+Pulsar stores data on BookKeeper ledgers, you can understand the contents of a
ledger by inspecting the metadata attached to the ledger.
+Such metadata are stored on ZooKeeper and they are readable using BookKeeper
APIs.
+
+Description of current metadata:
+
+| Scope | Metadata name | Metadata value |
+| ------------- | ------------- | ------------- |
+| All ledgers | application | 'pulsar' |
+| All ledgers | component | 'managed-ledger', 'schema', 'compacted-topic' |
+| Managed ledgers | pulsar/managed-ledger | name of the ledger |
+| Cursor | pulsar/cursor | name of the cursor |
+| Compacted topic | pulsar/compactedTopic | name of the original topic |
+| Compacted topic | pulsar/compactedTo | id of the last compacted message |
+
+
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 12a2711..34d4fa9 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -99,7 +99,8 @@
"cookbooks-partitioned",
"cookbooks-retention-expiry",
"cookbooks-encryption",
- "cookbooks-message-queue"
+ "cookbooks-message-queue",
+ "cookbooks-bookkeepermetadata"
],
"Development": [
"develop-tools",