This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 897203d909e89d7d31e06f97a3d90a709ef1a2f8 Author: Yong Zhang <[email protected]> AuthorDate: Wed Dec 30 18:24:21 2020 +0800 Fixes the recovery not respect to the isolation group settings (#8961) --- *Motivation* When users configure to use ZkIsolatedBookieEnsemblePlacementPolicy, it is hard to configure AutoRecovery to respect to the isolation group settings. Because we don't store the isolation group information as part of ledger metadata, the framework doesn't have any information to use for choosing the bookies. *Modifications* - Change the ZkIsolatedBookieEnsemblePlacementPolicy to use the policy passed from the custom metadata (cherry picked from commit bddd030aa1a0b8b3b1767b337880baeb35662ffd) --- .../mledger/impl/LedgerMetadataUtils.java | 22 +++++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 39 +------- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 22 +++++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 21 ++++- .../pulsar/broker/ManagedLedgerClientFactory.java | 2 +- .../broker/service/BrokerBookieIsolationTest.java | 2 +- pulsar-common/pom.xml | 5 + .../data/EnsemblePlacementPolicyConfig.java | 76 ++++++++++++++++ .../ZkIsolatedBookieEnsemblePlacementPolicy.java | 101 +++++++++++++++++---- ...kIsolatedBookieEnsemblePlacementPolicyTest.java | 65 +++++++++++++ 10 files changed, 298 insertions(+), 57 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java index 3a245d1..1f59603 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java @@ -19,6 +19,10 @@ package org.apache.bookkeeper.mledger.impl; import com.google.common.collect.ImmutableMap; +import org.apache.bookkeeper.client.EnsemblePlacementPolicy; +import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; + import java.nio.charset.StandardCharsets; import java.util.Map; @@ -99,6 +103,24 @@ public final class LedgerMetadataUtils { ); } + /** + * Build additional metadata for the placement policy config. + * + * @param className + * the ensemble placement policy classname + * @param properties + * the ensemble placement policy properties + * @return + * the additional metadata + * @throws ParseJsonException + * placement policy configuration encode error + */ + static Map<String, byte[]> buildMetadataForPlacementPolicyConfig( + Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException { + EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties); + return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode()); + } + private LedgerMetadataUtils() {} } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f411eff..7efb471 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; -import com.google.common.base.Objects; import com.google.common.base.Predicates; import com.google.common.collect.Maps; @@ -45,7 +44,6 @@ import java.util.stream.Collectors; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -81,6 +79,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore; @@ -871,41 +870,5 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata); } - public static class EnsemblePlacementPolicyConfig { - private final Class<? extends EnsemblePlacementPolicy> policyClass; - private final Map<String, Object> properties; - - public EnsemblePlacementPolicyConfig(Class<? extends EnsemblePlacementPolicy> policyClass, - Map<String, Object> properties) { - super(); - this.policyClass = policyClass; - this.properties = properties; - } - - public Class<? extends EnsemblePlacementPolicy> getPolicyClass() { - return policyClass; - } - - public Map<String, Object> getProperties() { - return properties; - } - - @Override - public int hashCode() { - return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EnsemblePlacementPolicyConfig) { - EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj; - return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(), - other.policyClass == null ? null : other.policyClass.getName()) - && Objects.equal(this.properties, other.properties); - } - return false; - } - } - private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ba50d2b..4fe5318 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -74,6 +74,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.Backoff; +import org.apache.bookkeeper.common.util.JsonUtil; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; @@ -249,6 +250,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { */ final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>(); + /** + * This variable is used for testing the tests + * {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()} + */ + @VisibleForTesting + Map<String, byte[]> createdLedgerCustomMetadata; + // ////////////////////////////////////////////////////////////////////// public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, @@ -3239,6 +3247,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Map<String, byte[]> finalMetadata = new HashMap<>(); finalMetadata.putAll(ledgerMetadata); finalMetadata.putAll(metadata); + if (config.getBookKeeperEnsemblePlacementPolicyClassName() != null + && config.getBookKeeperEnsemblePlacementPolicyProperties() != null) { + try { + finalMetadata.putAll(LedgerMetadataUtils.buildMetadataForPlacementPolicyConfig( + config.getBookKeeperEnsemblePlacementPolicyClassName(), + config.getBookKeeperEnsemblePlacementPolicyProperties() + )); + } catch (JsonUtil.ParseJsonException e) { + log.error("[{}] Serialize the placement configuration failed", name, e); + cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); + return; + } + } + createdLedgerCustomMetadata = finalMetadata; log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds", name, finalMetadata, config.getMetadataOperationsTimeoutSeconds()); try { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e6388c4..35f7921 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -69,6 +70,7 @@ import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.client.PulsarMockLedgerHandle; @@ -2791,6 +2793,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } } + private abstract class MockedPlacementPolicy implements EnsemblePlacementPolicy{} + + @Test(timeOut = 10000) + public void testManagedLedgerWithPlacementPolicyInCustomMetadata() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(MockedPlacementPolicy.class); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(Collections.singletonMap("key", "value")); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", managedLedgerConfig); + assertFalse(ledger.createdLedgerCustomMetadata.isEmpty()); + byte[] configData = ledger.createdLedgerCustomMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG); + EnsemblePlacementPolicyConfig config = EnsemblePlacementPolicyConfig.decode(configData); + assertEquals(config.getPolicyClass().getName(), MockedPlacementPolicy.class.getName()); + assertEquals(config.getProperties().size(), 1); + assertTrue(config.getProperties().containsKey("key")); + assertEquals(config.getProperties().get("key"), "value"); + } + private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception { Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true); @@ -2806,4 +2825,4 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i)); } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 5f6cdaf..398da08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -30,12 +30,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.configuration.Configuration; import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index bb0ae00..9662f3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -40,7 +40,6 @@ import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl. import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.meta.LedgerManager; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.net.BookieId; @@ -63,6 +62,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index a22f82d..f5ef697 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -102,6 +102,11 @@ </dependency> <dependency> + <groupId>org.apache.bookkeeper</groupId> + <artifactId>bookkeeper-common</artifactId> + </dependency> + + <dependency> <groupId>io.airlift</groupId> <artifactId>aircompressor</artifactId> </dependency> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java new file mode 100644 index 0000000..2c42f14 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import com.google.common.base.Objects; +import org.apache.bookkeeper.common.util.JsonUtil; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; + +public class EnsemblePlacementPolicyConfig { + public static final String ENSEMBLE_PLACEMENT_POLICY_CONFIG = "EnsemblePlacementPolicyConfig"; + private final Class policyClass; + private final Map<String, Object> properties; + + // Add a default constructor for decode data from bytes to construct this. + private EnsemblePlacementPolicyConfig() { + this.policyClass = null; + this.properties = Collections.emptyMap(); + } + + public EnsemblePlacementPolicyConfig(Class policyClass, Map<String, Object> properties) { + super(); + this.policyClass = policyClass; + this.properties = properties; + } + + public Class getPolicyClass() { + return policyClass; + } + + public Map<String, Object> getProperties() { + return properties; + } + + @Override + public int hashCode() { + return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof EnsemblePlacementPolicyConfig) { + EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj; + return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(), + other.policyClass == null ? null : other.policyClass.getName()) + && Objects.equal(this.properties, other.properties); + } + return false; + } + + public byte[] encode() throws JsonUtil.ParseJsonException { + return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8); + } + + public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException { + return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class); + } +} diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java index efa4120..5cca446 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java @@ -19,6 +19,8 @@ package org.apache.pulsar.zookeeper; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -29,18 +31,23 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl; +import org.apache.bookkeeper.common.util.JsonUtil; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; +import org.inferred.freebuilder.shaded.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +57,8 @@ import io.netty.util.HashedWheelTimer; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieAddressResolver; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; + public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy implements Deserializer<BookiesRackConfiguration> { private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class); @@ -59,9 +68,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl private ZooKeeperCache bookieMappingCache = null; - private final List<String> primaryIsolationGroups = new ArrayList<>(); - private final List<String> secondaryIsolationGroups = new ArrayList<>(); private final ObjectMapper jsonMapper = ObjectMapperFactory.create(); + // Using a pair to save the isolation groups, the left value is the primary group and the right value is + // the secondary group. + private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups; public ZkIsolatedBookieEnsemblePlacementPolicy() { super(); @@ -71,6 +81,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { + Set<String> primaryIsolationGroups = new HashSet<>(); + Set<String> secondaryIsolationGroups = new HashSet<>(); if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) { String isolationGroupsString = castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS)); if (!isolationGroupsString.isEmpty()) { @@ -88,10 +100,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl } } } + defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups); return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger, bookieAddressResolver); } - private String castToString(Object obj) { + private static String castToString(Object obj) { + if (null == obj) { + return ""; + } if (obj instanceof List<?>) { List<String> result = new ArrayList<>(); for (Object o : (List<?>) obj) { @@ -134,7 +150,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies) throws BKNotEnoughBookiesException { - Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize); + Map<String, List<String>> isolationGroup = new HashMap<>(); + Set<BookieId> blacklistedBookies = getBlacklistedBookiesWithIsolationGroups( + ensembleSize, defaultIsolationGroups); if (excludeBookies == null) { excludeBookies = new HashSet<BookieId>(); } @@ -147,7 +165,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble, BookieId bookieToReplace, Set<BookieId> excludeBookies) throws BKNotEnoughBookiesException { - Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize); + // parse the ensemble placement policy from the custom metadata, if it is present, we will apply it to + // the isolation groups for filtering the bookies. + Optional<EnsemblePlacementPolicyConfig> ensemblePlacementPolicyConfig = + getEnsemblePlacementPolicyConfig(customMetadata); + Set<BookieId> blacklistedBookies; + if (ensemblePlacementPolicyConfig.isPresent()) { + EnsemblePlacementPolicyConfig config = ensemblePlacementPolicyConfig.get(); + Pair<Set<String>, Set<String>> groups = getIsolationGroup(config); + blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(ensembleSize, groups); + } else { + blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(ensembleSize, defaultIsolationGroups); + } if (excludeBookies == null) { excludeBookies = new HashSet<BookieId>(); } @@ -156,26 +185,66 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl bookieToReplace, excludeBookies); } - private Set<BookieId> getBlacklistedBookies(int ensembleSize) { - Set<BookieId> blacklistedBookies = new HashSet<BookieId>(); + private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolicyConfig( + Map<String, byte[]> customMetadata) { + + byte[] ensemblePlacementPolicyConfigData = customMetadata.get( + EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG); + if (ensemblePlacementPolicyConfigData != null) { + try { + return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData)); + } catch (JsonUtil.ParseJsonException e) { + LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e); + return Optional.empty(); + } + } + return Optional.empty(); + } + + private static Pair<Set<String>, Set<String>> getIsolationGroup(EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) { + MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(); + String className = ZkIsolatedBookieEnsemblePlacementPolicy.class.getName(); + if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) { + Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties(); + String primaryIsolationGroupString = castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, "")); + String secondaryIsolationGroupString = castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, "")); + if (!primaryIsolationGroupString.isEmpty()) { + pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(","))); + } + if (!secondaryIsolationGroupString.isEmpty()) { + pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(","))); + } + } + return pair; + } + + private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize, + Pair<Set<String>, Set<String>> isolationGroups) { + Set<BookieId> blacklistedBookies = new HashSet<>(); try { if (bookieMappingCache != null) { BookiesRackConfiguration allGroupsBookieMapping = bookieMappingCache - .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this) - .orElseThrow(() -> new KeeperException.NoNodeException( - ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)); + .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this) + .orElseThrow(() -> new KeeperException.NoNodeException( + ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)); Set<String> allBookies = allGroupsBookieMapping.keySet(); int totalAvailableBookiesInPrimaryGroup = 0; + Set<String> primaryIsolationGroup = Collections.emptySet(); + Set<String> secondaryIsolationGroup = Collections.emptySet(); + if (isolationGroups != null) { + primaryIsolationGroup = isolationGroups.getLeft(); + secondaryIsolationGroup = isolationGroups.getRight(); + } for (String group : allBookies) { Set<String> bookiesInGroup = allGroupsBookieMapping.get(group).keySet(); - if (!primaryIsolationGroups.contains(group)) { + if (!primaryIsolationGroup.contains(group)) { for (String bookieAddress : bookiesInGroup) { blacklistedBookies.add(BookieId.parse(bookieAddress)); } } else { for (String groupBookie : bookiesInGroup) { totalAvailableBookiesInPrimaryGroup += knownBookies - .containsKey(BookieId.parse(groupBookie)) ? 1 : 0; + .containsKey(BookieId.parse(groupBookie)) ? 1 : 0; } } } @@ -183,7 +252,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl // groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so, // try to remove those overlapped bookies from excluded-bookie list because they are also part of // isolated-group bookies. - for (String group : primaryIsolationGroups) { + for (String group : primaryIsolationGroup) { Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group); if (bookieGroup != null && !bookieGroup.isEmpty()) { for (String bookieAddress : bookieGroup.keySet()) { @@ -194,9 +263,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well. if (totalAvailableBookiesInPrimaryGroup < ensembleSize) { LOG.info( - "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]", - primaryIsolationGroups, secondaryIsolationGroups); - for (String group : secondaryIsolationGroups) { + "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]", + primaryIsolationGroup, secondaryIsolationGroup); + for (String group : secondaryIsolationGroup) { Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group); if (bookieGroup != null && !bookieGroup.isEmpty()) { for (String bookieAddress : bookieGroup.keySet()) { diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java index e3f1a9c..6ed31fc 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.util.HashedWheelTimer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -45,6 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.pulsar.common.policies.data.BookieInfo; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; @@ -449,4 +451,67 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); } + + /** + * test case for auto-recovery. + * When the auto-recovery trigger from bookkeeper, we need to make sure the placement policy can read from + * custom metadata and apply it when choosing the new bookie. + */ + @Test + public void testTheIsolationPolicyUsingCustomMetadata() throws Exception { + // We configure two groups for the isolation policy, one is the 'primary' group, and the another is + // 'secondary' group. + // We put bookie1, bookie2, bookie3 into the 'primary' group, and put bookie4 into the 'secondary' group. + Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>(); + Map<String, BookieInfo> primaryIsolationBookieGroups = new HashMap<>(); + String primaryGroupName = "primary"; + String secondaryGroupName = "secondary"; + primaryIsolationBookieGroups.put(BOOKIE1, new BookieInfo("rack0", null)); + primaryIsolationBookieGroups.put(BOOKIE2, new BookieInfo("rack0", null)); + primaryIsolationBookieGroups.put(BOOKIE3, new BookieInfo("rack1", null)); + + Map<String, BookieInfo> secondaryIsolationBookieGroups = new HashMap<>(); + secondaryIsolationBookieGroups.put(BOOKIE4, new BookieInfo("rack0", null)); + bookieMapping.put(primaryGroupName, primaryIsolationBookieGroups); + bookieMapping.put(secondaryGroupName, secondaryIsolationBookieGroups); + + ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, + jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Thread.sleep(100); + + // prepare a custom placement policy and put it into the custom metadata. The isolation policy should decode + // from the custom metadata and apply it to the get black list method. + Map<String, Object> placementPolicyProperties = new HashMap<>(); + placementPolicyProperties.put( + ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, primaryGroupName); + placementPolicyProperties.put( + ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryGroupName); + EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig( + ZkIsolatedBookieEnsemblePlacementPolicy.class, + placementPolicyProperties + ); + Map<String, byte[]> customMetadata = new HashMap<>(); + customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode()); + + // do the test logic + ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) { + }); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, primaryGroupName); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + // we assume we have an ensemble list which is consist with bookie1 and bookie3, and bookie3 is broken. + // we want to get a replace bookie from the 'primary' group and that should be bookie2. Because we only have + // bookie1, bookie2, and bookie3 in the 'primary' group. + BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId(); + BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId(); + BookieId bookie3Id = new BookieSocketAddress(BOOKIE3).toBookieId(); + BookieId bookieId = isolationPolicy.replaceBookie(2, 1, 1, customMetadata, + Arrays.asList(bookie1Id, bookie3Id), bookie3Id, null).getResult(); + assertEquals(bookieId, bookie2Id); + } }
