This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14d19585df7acb8b07eb6b08a67f945cf92a01a7 Author: Yong Zhang <[email protected]> AuthorDate: Sat Feb 13 03:58:25 2021 +0800 Avoid introducing bookkeeper-common into the pulsar-common (#9551) * Avoid introduce bookkeeper-common into the pulsar-common --- *Motivation* Direct using jackson to parse json to avoid introduce the bookkeeper-common into the pulsar-common. That will make other modules which are using pulsar-common has some unused bookkeeper dependencies. * Fix the build and add some tests * Address comments (cherry picked from commit 18e61b3989df66c6789574f72527144ff5fda25e) --- .../mledger/impl/LedgerMetadataUtils.java | 4 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 +- pulsar-common/pom.xml | 5 -- .../data/EnsemblePlacementPolicyConfig.java | 29 ++++++++++-- .../data/EnsemblePlacementPolicyConfigTest.java | 54 ++++++++++++++++++++++ .../ZkIsolatedBookieEnsemblePlacementPolicy.java | 3 +- 6 files changed, 87 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java index 1f59603..5edef0f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException; @@ -116,7 +117,8 @@ public final class LedgerMetadataUtils { * placement policy configuration encode error */ static Map<String, byte[]> buildMetadataForPlacementPolicyConfig( - Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException { + Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) + throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException { EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties); return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode()); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9694f2d..f0c57e8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; + +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableMap; @@ -119,6 +121,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.Stat; @@ -3303,7 +3306,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties() )); - } catch (JsonUtil.ParseJsonException e) { + } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) { log.error("[{}] Serialize the placement configuration failed", name, e); cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); return; diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index f5ef697..a22f82d 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -102,11 +102,6 @@ </dependency> <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-common</artifactId> - </dependency> - - <dependency> <groupId>io.airlift</groupId> <artifactId>aircompressor</artifactId> </dependency> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java index 2c42f14..64a0587 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java @@ -18,8 +18,9 @@ */ package org.apache.pulsar.common.policies.data; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Objects; -import org.apache.bookkeeper.common.util.JsonUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -66,11 +67,29 @@ public class EnsemblePlacementPolicyConfig { return false; } - public byte[] encode() throws JsonUtil.ParseJsonException { - return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8); + public byte[] encode() throws ParseEnsemblePlacementPolicyConfigException { + try { + return ObjectMapperFactory.getThreadLocal() + .writerWithDefaultPrettyPrinter() + .writeValueAsString(this) + .getBytes(StandardCharsets.UTF_8); + } catch (JsonProcessingException e) { + throw new ParseEnsemblePlacementPolicyConfigException("Failed to encode to json", e); + } } - public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException { - return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class); + public static EnsemblePlacementPolicyConfig decode(byte[] data) throws ParseEnsemblePlacementPolicyConfigException { + try { + return ObjectMapperFactory.getThreadLocal() + .readValue(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class); + } catch (JsonProcessingException e) { + throw new ParseEnsemblePlacementPolicyConfigException("Failed to decode from json", e); + } + } + + public static class ParseEnsemblePlacementPolicyConfigException extends Exception { + ParseEnsemblePlacementPolicyConfigException(String message, Throwable throwable) { + super(message, throwable); + } } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java new file mode 100644 index 0000000..695b5b5 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Collections; + +public class EnsemblePlacementPolicyConfigTest { + + static class MockedEnsemblePlacementPolicy {} + + @Test + public void testEncodeDecodeSuccessfully() + throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException { + + EnsemblePlacementPolicyConfig originalConfig = + new EnsemblePlacementPolicyConfig(MockedEnsemblePlacementPolicy.class, Collections.EMPTY_MAP); + byte[] encodedConfig = originalConfig.encode(); + + EnsemblePlacementPolicyConfig decodedConfig = + EnsemblePlacementPolicyConfig.decode(encodedConfig); + Assert.assertEquals(decodedConfig, originalConfig); + } + + @Test + public void testDecodeFailed() { + byte[] configBytes = new byte[0]; + try { + EnsemblePlacementPolicyConfig.decode(configBytes); + Assert.fail("should failed parse the config from bytes"); + } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) { + // expected error + } + } +} diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java index 5cca446..e7f393a 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl; @@ -193,7 +194,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl if (ensemblePlacementPolicyConfigData != null) { try { return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData)); - } catch (JsonUtil.ParseJsonException e) { + } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) { LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e); return Optional.empty(); }
