This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 897203d909e89d7d31e06f97a3d90a709ef1a2f8
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Dec 30 18:24:21 2020 +0800

    Fixes the recovery not respect to the isolation group settings (#8961)
    
    ---
    
    *Motivation*
    
    When users configure to use ZkIsolatedBookieEnsemblePlacementPolicy,
    it is hard to configure AutoRecovery to respect to the isolation group
    settings. Because we don't store the isolation group information as
    part of ledger metadata, the framework doesn't have any information
    to use for choosing the bookies.
    
    *Modifications*
    
    - Change the ZkIsolatedBookieEnsemblePlacementPolicy to use the policy
    passed from the custom metadata
    
    (cherry picked from commit bddd030aa1a0b8b3b1767b337880baeb35662ffd)
---
 .../mledger/impl/LedgerMetadataUtils.java          |  22 +++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  39 +-------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  22 +++++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  21 ++++-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   2 +-
 .../broker/service/BrokerBookieIsolationTest.java  |   2 +-
 pulsar-common/pom.xml                              |   5 +
 .../data/EnsemblePlacementPolicyConfig.java        |  76 ++++++++++++++++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 101 +++++++++++++++++----
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java |  65 +++++++++++++
 10 files changed, 298 insertions(+), 57 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 3a245d1..1f59603 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -19,6 +19,10 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
@@ -99,6 +103,24 @@ public final class LedgerMetadataUtils {
         );
     }
 
+    /**
+     * Build additional metadata for the placement policy config.
+     *
+     * @param className
+     *          the ensemble placement policy classname
+     * @param properties
+     *          the ensemble placement policy properties
+     * @return
+     *          the additional metadata
+     * @throws ParseJsonException
+     *          placement policy configuration encode error
+     */
+    static Map<String, byte[]> buildMetadataForPlacementPolicyConfig(
+        Class<? extends EnsemblePlacementPolicy> className, Map<String, 
Object> properties) throws ParseJsonException {
+        EnsemblePlacementPolicyConfig config = new 
EnsemblePlacementPolicyConfig(className, properties);
+        return 
ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, 
config.encode());
+    }
+
     private LedgerMetadataUtils() {}
 
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f411eff..7efb471 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
-import com.google.common.base.Objects;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 
@@ -45,7 +44,6 @@ import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -81,6 +79,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
@@ -871,41 +870,5 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         BookKeeper get(EnsemblePlacementPolicyConfig 
ensemblePlacementPolicyMetadata);
     }
 
-    public static class EnsemblePlacementPolicyConfig {
-        private final Class<? extends EnsemblePlacementPolicy> policyClass;
-        private final Map<String, Object> properties;
-
-        public EnsemblePlacementPolicyConfig(Class<? extends 
EnsemblePlacementPolicy> policyClass,
-                Map<String, Object> properties) {
-            super();
-            this.policyClass = policyClass;
-            this.properties = properties;
-        }
-
-        public Class<? extends EnsemblePlacementPolicy> getPolicyClass() {
-            return policyClass;
-        }
-
-        public Map<String, Object> getProperties() {
-            return properties;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(policyClass != null ? 
policyClass.getName() : "", properties);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj instanceof EnsemblePlacementPolicyConfig) {
-                EnsemblePlacementPolicyConfig other = 
(EnsemblePlacementPolicyConfig) obj;
-                return Objects.equal(this.policyClass == null ? null : 
this.policyClass.getName(),
-                        other.policyClass == null ? null : 
other.policyClass.getName())
-                        && Objects.equal(this.properties, other.properties);
-            }
-            return false;
-        }
-    }
-
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ba50d2b..4fe5318 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Retries;
@@ -249,6 +250,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      */
     final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
+    /**
+     * This variable is used for testing the tests
+     * {@link 
ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
+     */
+    @VisibleForTesting
+    Map<String, byte[]> createdLedgerCustomMetadata;
+
     // //////////////////////////////////////////////////////////////////////
 
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
@@ -3239,6 +3247,20 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
+        if (config.getBookKeeperEnsemblePlacementPolicyClassName() != null
+            && config.getBookKeeperEnsemblePlacementPolicyProperties() != 
null) {
+            try {
+                
finalMetadata.putAll(LedgerMetadataUtils.buildMetadataForPlacementPolicyConfig(
+                    config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                    config.getBookKeeperEnsemblePlacementPolicyProperties()
+                ));
+            } catch (JsonUtil.ParseJsonException e) {
+                log.error("[{}] Serialize the placement configuration failed", 
name, e);
+                cb.createComplete(Code.UnexpectedConditionException, null, 
ledgerCreated);
+                return;
+            }
+        }
+        createdLedgerCustomMetadata = finalMetadata;
         log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : 
{} seconds",
             name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
         try {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e6388c4..35f7921 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -69,6 +70,7 @@ import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
@@ -2791,6 +2793,23 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         }
     }
 
+    private abstract class MockedPlacementPolicy implements 
EnsemblePlacementPolicy{}
+
+    @Test(timeOut = 10000)
+    public void testManagedLedgerWithPlacementPolicyInCustomMetadata() throws 
Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(MockedPlacementPolicy.class);
+        
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(Collections.singletonMap("key",
 "value"));
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my_test_ledger", managedLedgerConfig);
+        assertFalse(ledger.createdLedgerCustomMetadata.isEmpty());
+        byte[] configData = 
ledger.createdLedgerCustomMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG);
+        EnsemblePlacementPolicyConfig config = 
EnsemblePlacementPolicyConfig.decode(configData);
+        assertEquals(config.getPolicyClass().getName(), 
MockedPlacementPolicy.class.getName());
+        assertEquals(config.getProperties().size(), 1);
+        assertTrue(config.getProperties().containsKey("key"));
+        assertEquals(config.getProperties().get("key"), "value");
+    }
+
     private void setFieldValue(Class clazz, Object classObj, String fieldName, 
Object fieldValue) throws Exception {
         Field field = clazz.getDeclaredField(fieldName);
         field.setAccessible(true);
@@ -2806,4 +2825,4 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 5f6cdaf..398da08 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -30,12 +30,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
-import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import 
org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index bb0ae00..9662f3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -40,7 +40,6 @@ import static 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.net.BookieId;
@@ -63,6 +62,7 @@ import 
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index a22f82d..f5ef697 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -102,6 +102,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>aircompressor</artifactId>
     </dependency>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
new file mode 100644
index 0000000..2c42f14
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.google.common.base.Objects;
+import org.apache.bookkeeper.common.util.JsonUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
+public class EnsemblePlacementPolicyConfig {
+    public static final String ENSEMBLE_PLACEMENT_POLICY_CONFIG = 
"EnsemblePlacementPolicyConfig";
+    private final Class policyClass;
+    private final Map<String, Object> properties;
+
+    // Add a default constructor for decode data from bytes to construct this.
+    private EnsemblePlacementPolicyConfig() {
+        this.policyClass = null;
+        this.properties = Collections.emptyMap();
+    }
+
+    public EnsemblePlacementPolicyConfig(Class policyClass, Map<String, 
Object> properties) {
+        super();
+        this.policyClass = policyClass;
+        this.properties = properties;
+    }
+
+    public Class getPolicyClass() {
+        return policyClass;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(policyClass != null ? policyClass.getName() : 
"", properties);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof EnsemblePlacementPolicyConfig) {
+            EnsemblePlacementPolicyConfig other = 
(EnsemblePlacementPolicyConfig) obj;
+            return Objects.equal(this.policyClass == null ? null : 
this.policyClass.getName(),
+                other.policyClass == null ? null : other.policyClass.getName())
+                && Objects.equal(this.properties, other.properties);
+        }
+        return false;
+    }
+
+    public byte[] encode() throws JsonUtil.ParseJsonException {
+        return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8);
+    }
+
+    public static EnsemblePlacementPolicyConfig decode(byte[] data) throws 
JsonUtil.ParseJsonException {
+        return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), 
EnsemblePlacementPolicyConfig.class);
+    }
+}
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index efa4120..5cca446 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.zookeeper;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,18 +31,23 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
+import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
+import org.inferred.freebuilder.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +57,8 @@ import io.netty.util.HashedWheelTimer;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
 
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+
 public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlacementPolicy
         implements Deserializer<BookiesRackConfiguration> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
@@ -59,9 +68,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePl
 
     private ZooKeeperCache bookieMappingCache = null;
 
-    private final List<String> primaryIsolationGroups = new ArrayList<>();
-    private final List<String> secondaryIsolationGroups = new ArrayList<>();
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
+    // Using a pair to save the isolation groups, the left value is the 
primary group and the right value is
+    // the secondary group.
+    private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
 
     public ZkIsolatedBookieEnsemblePlacementPolicy() {
         super();
@@ -71,6 +81,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePl
     public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration 
conf,
             Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer 
timer, FeatureProvider featureProvider,
             StatsLogger statsLogger, BookieAddressResolver 
bookieAddressResolver) {
+        Set<String> primaryIsolationGroups = new HashSet<>();
+        Set<String> secondaryIsolationGroups = new HashSet<>();
         if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
             String isolationGroupsString = 
castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
             if (!isolationGroupsString.isEmpty()) {
@@ -88,10 +100,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                 }
             }
         }
+        defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, 
secondaryIsolationGroups);
         return super.initialize(conf, optionalDnsResolver, timer, 
featureProvider, statsLogger, bookieAddressResolver);
     }
 
-    private String castToString(Object obj) {
+    private static String castToString(Object obj) {
+        if (null == obj) {
+            return "";
+        }
         if (obj instanceof List<?>) {
             List<String> result = new ArrayList<>();
             for (Object o : (List<?>) obj) {
@@ -134,7 +150,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
     public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
+        Map<String, List<String>> isolationGroup = new HashMap<>();
+        Set<BookieId> blacklistedBookies = 
getBlacklistedBookiesWithIsolationGroups(
+            ensembleSize, defaultIsolationGroups);
         if (excludeBookies == null) {
             excludeBookies = new HashSet<BookieId>();
         }
@@ -147,7 +165,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
             Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
             BookieId bookieToReplace, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
+        // parse the ensemble placement policy from the custom metadata, if it 
is present, we will apply it to
+        // the isolation groups for filtering the bookies.
+        Optional<EnsemblePlacementPolicyConfig> ensemblePlacementPolicyConfig =
+            getEnsemblePlacementPolicyConfig(customMetadata);
+        Set<BookieId> blacklistedBookies;
+        if (ensemblePlacementPolicyConfig.isPresent()) {
+            EnsemblePlacementPolicyConfig config = 
ensemblePlacementPolicyConfig.get();
+            Pair<Set<String>, Set<String>> groups = getIsolationGroup(config);
+            blacklistedBookies = 
getBlacklistedBookiesWithIsolationGroups(ensembleSize, groups);
+        } else {
+            blacklistedBookies = 
getBlacklistedBookiesWithIsolationGroups(ensembleSize, defaultIsolationGroups);
+        }
         if (excludeBookies == null) {
             excludeBookies = new HashSet<BookieId>();
         }
@@ -156,26 +185,66 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                 bookieToReplace, excludeBookies);
     }
 
-    private Set<BookieId> getBlacklistedBookies(int ensembleSize) {
-        Set<BookieId> blacklistedBookies = new HashSet<BookieId>();
+    private static Optional<EnsemblePlacementPolicyConfig> 
getEnsemblePlacementPolicyConfig(
+        Map<String, byte[]> customMetadata) {
+
+        byte[] ensemblePlacementPolicyConfigData = customMetadata.get(
+            EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG);
+        if (ensemblePlacementPolicyConfigData != null) {
+            try {
+                return 
Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData));
+            } catch (JsonUtil.ParseJsonException e) {
+                LOG.error("Failed to parse the ensemble placement policy 
config from the custom metadata", e);
+                return Optional.empty();
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static Pair<Set<String>, Set<String>> 
getIsolationGroup(EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
+        MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
+        String className = 
ZkIsolatedBookieEnsemblePlacementPolicy.class.getName();
+        if 
(ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
+            Map<String, Object> properties = 
ensemblePlacementPolicyConfig.getProperties();
+            String primaryIsolationGroupString = 
castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
+            String secondaryIsolationGroupString = 
castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
+            if (!primaryIsolationGroupString.isEmpty()) {
+                
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
+            }
+            if (!secondaryIsolationGroupString.isEmpty()) {
+                
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
+            }
+        }
+        return pair;
+    }
+
+    private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int 
ensembleSize,
+        Pair<Set<String>, Set<String>> isolationGroups) {
+        Set<BookieId> blacklistedBookies = new HashSet<>();
         try {
             if (bookieMappingCache != null) {
                 BookiesRackConfiguration allGroupsBookieMapping = 
bookieMappingCache
-                        
.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
-                        .orElseThrow(() -> new KeeperException.NoNodeException(
-                                
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
+                    
.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
+                    .orElseThrow(() -> new KeeperException.NoNodeException(
+                        ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
                 Set<String> allBookies = allGroupsBookieMapping.keySet();
                 int totalAvailableBookiesInPrimaryGroup = 0;
+                Set<String> primaryIsolationGroup = Collections.emptySet();
+                Set<String> secondaryIsolationGroup = Collections.emptySet();
+                if (isolationGroups != null) {
+                    primaryIsolationGroup = isolationGroups.getLeft();
+                    secondaryIsolationGroup = isolationGroups.getRight();
+                }
                 for (String group : allBookies) {
                     Set<String> bookiesInGroup = 
allGroupsBookieMapping.get(group).keySet();
-                    if (!primaryIsolationGroups.contains(group)) {
+                    if (!primaryIsolationGroup.contains(group)) {
                         for (String bookieAddress : bookiesInGroup) {
                             
blacklistedBookies.add(BookieId.parse(bookieAddress));
                         }
                     } else {
                         for (String groupBookie : bookiesInGroup) {
                             totalAvailableBookiesInPrimaryGroup += knownBookies
-                                    .containsKey(BookieId.parse(groupBookie)) 
? 1 : 0;
+                                .containsKey(BookieId.parse(groupBookie)) ? 1 
: 0;
                         }
                     }
                 }
@@ -183,7 +252,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                 // groups. so, same set of bookies could be overlapped into 
isolated-group and other default groups. so,
                 // try to remove those overlapped bookies from excluded-bookie 
list because they are also part of
                 // isolated-group bookies.
-                for (String group : primaryIsolationGroups) {
+                for (String group : primaryIsolationGroup) {
                     Map<String, BookieInfo> bookieGroup = 
allGroupsBookieMapping.get(group);
                     if (bookieGroup != null && !bookieGroup.isEmpty()) {
                         for (String bookieAddress : bookieGroup.keySet()) {
@@ -194,9 +263,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                 // if primary-isolated-bookies are not enough then add 
consider secondary isolated bookie group as well.
                 if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
                     LOG.info(
-                            "Not found enough available-bookies from primary 
isolation group [{}] , checking secondary group [{}]",
-                            primaryIsolationGroups, secondaryIsolationGroups);
-                    for (String group : secondaryIsolationGroups) {
+                        "Not found enough available-bookies from primary 
isolation group [{}] , checking secondary group [{}]",
+                        primaryIsolationGroup, secondaryIsolationGroup);
+                    for (String group : secondaryIsolationGroup) {
                         Map<String, BookieInfo> bookieGroup = 
allGroupsBookieMapping.get(group);
                         if (bookieGroup != null && !bookieGroup.isEmpty()) {
                             for (String bookieAddress : bookieGroup.keySet()) {
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index e3f1a9c..6ed31fc 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.util.HashedWheelTimer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -449,4 +451,67 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
+
+    /**
+     * test case for auto-recovery.
+     * When the auto-recovery trigger from bookkeeper, we need to make sure 
the placement policy can read from
+     * custom metadata and apply it when choosing the new bookie.
+     */
+    @Test
+    public void testTheIsolationPolicyUsingCustomMetadata() throws Exception {
+        // We configure two groups for the isolation policy, one is the 
'primary' group, and the another is
+        // 'secondary' group.
+        // We put bookie1, bookie2, bookie3 into the 'primary' group, and put 
bookie4 into the 'secondary' group.
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> primaryIsolationBookieGroups = new HashMap<>();
+        String primaryGroupName = "primary";
+        String secondaryGroupName = "secondary";
+        primaryIsolationBookieGroups.put(BOOKIE1, new BookieInfo("rack0", 
null));
+        primaryIsolationBookieGroups.put(BOOKIE2, new BookieInfo("rack0", 
null));
+        primaryIsolationBookieGroups.put(BOOKIE3, new BookieInfo("rack1", 
null));
+
+        Map<String, BookieInfo> secondaryIsolationBookieGroups = new 
HashMap<>();
+        secondaryIsolationBookieGroups.put(BOOKIE4, new BookieInfo("rack0", 
null));
+        bookieMapping.put(primaryGroupName, primaryIsolationBookieGroups);
+        bookieMapping.put(secondaryGroupName, secondaryIsolationBookieGroups);
+
+        ZkUtils.createFullPathOptimistic(localZkc, 
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+            jsonMapper.writeValueAsBytes(bookieMapping), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        Thread.sleep(100);
+
+        // prepare a custom placement policy and put it into the custom 
metadata. The isolation policy should decode
+        // from the custom metadata and apply it to the get black list method.
+        Map<String, Object> placementPolicyProperties = new HashMap<>();
+        placementPolicyProperties.put(
+            ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
primaryGroupName);
+        placementPolicyProperties.put(
+            
ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, 
secondaryGroupName);
+        EnsemblePlacementPolicyConfig policyConfig = new 
EnsemblePlacementPolicyConfig(
+            ZkIsolatedBookieEnsemblePlacementPolicy.class,
+            placementPolicyProperties
+        );
+        Map<String, byte[]> customMetadata = new HashMap<>();
+        
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
 policyConfig.encode());
+
+        // do the test logic
+        ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new 
ZkIsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new 
ZooKeeperCache("test", localZkc, 30) {
+        });
+        
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 primaryGroupName);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
+            NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        // we assume we have an ensemble list which is consist with bookie1 
and bookie3, and bookie3 is broken.
+        // we want to get a replace bookie from the 'primary' group and that 
should be bookie2. Because we only have
+        // bookie1, bookie2, and bookie3 in the 'primary' group.
+        BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
+        BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
+        BookieId bookie3Id = new BookieSocketAddress(BOOKIE3).toBookieId();
+        BookieId bookieId = isolationPolicy.replaceBookie(2, 1, 1, 
customMetadata,
+            Arrays.asList(bookie1Id, bookie3Id), bookie3Id, null).getResult();
+        assertEquals(bookieId, bookie2Id);
+    }
 }

Reply via email to