This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 022d43b265a63e5dc9cb87a84e7f63e100339037 Author: Hang Chen <[email protected]> AuthorDate: Sun Jan 16 10:58:15 2022 +0800 Fix invalid rack name cause bookie join rack failed (#13683) * Fix invalid rackname cause bookie join rack failed (cherry picked from commit af761e2f8d3aee35c2cf081998f556972a92b574) --- .../BrokerInterceptorWithClassLoaderTest.java | 50 ---------------------- .../common/policies/data/impl/BookieInfoImpl.java | 10 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 18 ++++++++ .../org/apache/pulsar/admin/cli/CmdBookies.java | 13 ++++++ .../zookeeper/ZkBookieRackAffinityMapping.java | 5 ++- .../zookeeper/ZkBookieRackAffinityMappingTest.java | 29 +++++++++++++ 6 files changed, 74 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java index 668323b..5288ab9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java @@ -70,32 +70,6 @@ public class BrokerInterceptorWithClassLoaderTest { assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); } @Override - public void onConnectionCreated(ServerCnx cnx) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override - public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override - public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override - public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, - long ledgerId, long entryId, Topic.PublishContext publishContext) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override - public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId, - long entryId, ByteBuf headersAndPayload) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override - public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { - assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); - } - @Override public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); } @@ -150,30 +124,6 @@ public class BrokerInterceptorWithClassLoaderTest { // test onPulsarCommand brokerInterceptorWithClassLoader.onPulsarCommand(null, mock(ServerCnx.class)); assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test messageAcked - brokerInterceptorWithClassLoader - .messageAcked(mock(ServerCnx.class), mock(Consumer.class), null); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test messageDispatched - brokerInterceptorWithClassLoader - .messageDispatched(mock(ServerCnx.class), mock(Consumer.class), 1, 1, null); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test messageProduced - brokerInterceptorWithClassLoader - .messageProduced(mock(ServerCnx.class), mock(Producer.class), 1, 1, 1, null); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test consumerCreated - brokerInterceptorWithClassLoader - .consumerCreated(mock(ServerCnx.class), mock(Consumer.class), Maps.newHashMap()); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test producerCreated - brokerInterceptorWithClassLoader - .producerCreated(mock(ServerCnx.class), mock(Producer.class), Maps.newHashMap()); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); - // test onConnectionCreated - brokerInterceptorWithClassLoader - .onConnectionCreated(mock(ServerCnx.class)); - assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); // test beforeSendMessage brokerInterceptorWithClassLoader .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BookieInfoImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BookieInfoImpl.java index de316e6..b584989 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BookieInfoImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BookieInfoImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.common.policies.data.impl; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.NonNull; import org.apache.pulsar.common.policies.data.BookieInfo; /** @@ -40,6 +41,7 @@ public final class BookieInfoImpl implements BookieInfo { public static class BookieInfoImplBuilder implements BookieInfo.Builder { private String rack; private String hostname; + private static final String PATH_SEPARATOR = "/"; public BookieInfoImplBuilder rack(String rack) { this.rack = rack; @@ -52,7 +54,15 @@ public final class BookieInfoImpl implements BookieInfo { } public BookieInfoImpl build() { + checkArgument(rack != null && !rack.isEmpty() && !rack.equals(PATH_SEPARATOR), + "rack name is invalid, it should not be null, empty or '/'"); return new BookieInfoImpl(rack, hostname); } + + public static void checkArgument(boolean expression, @NonNull Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 300eefc..5b8b4da 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -1460,6 +1461,23 @@ public class PulsarAdminToolTest { .rack("rack-1") .hostname("host-1") .build()); + + // test invalid rack name "" + try { + BookieInfo.builder().rack("").hostname("host-1").build(); + fail(); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "rack name is invalid, it should not be null, empty or '/'"); + } + + // test invalid rack name "/" + try { + BookieInfo.builder().rack("/").hostname("host-1").build(); + fail(); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "rack name is invalid, it should not be null, empty or '/'"); + } + } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java index c15e8db..38fd847 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBookies.java @@ -25,6 +25,8 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import java.util.function.Supplier; +import com.google.common.base.Strings; +import lombok.NonNull; @Parameters(commandDescription = "Operations about bookies rack placement") public class CmdBookies extends CmdBase { @@ -73,6 +75,8 @@ public class CmdBookies extends CmdBase { @Parameters(commandDescription = "Updates the rack placement information for a specific bookie in the cluster (note. bookie address format:`address:port`)") private class UpdateBookie extends CliCommand { + private static final String PATH_SEPARATOR = "/"; + @Parameter(names = { "-g", "--group" }, description = "Bookie group name", required = false) private String group = "default"; @@ -87,12 +91,21 @@ public class CmdBookies extends CmdBase { @Override void run() throws Exception { + checkArgument(!Strings.isNullOrEmpty(bookieRack) && !bookieRack.trim().equals(PATH_SEPARATOR), + "rack name is invalid, it should not be null, empty or '/'"); + getAdmin().bookies().updateBookieRackInfo(bookieAddress, group, BookieInfo.builder() .rack(bookieRack) .hostname(bookieHost) .build()); } + + private void checkArgument(boolean expression, @NonNull Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } } public CmdBookies(Supplier<PulsarAdmin> admin) { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java index caeba51..86d3988 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java @@ -20,6 +20,7 @@ package org.apache.pulsar.zookeeper; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Strings; import java.net.InetAddress; import java.net.URI; import java.util.ArrayList; @@ -212,7 +213,9 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping } } - if (bi != null) { + if (bi != null + && !Strings.isNullOrEmpty(bi.getRack()) + && !bi.getRack().trim().equals("/")) { String rack = bi.getRack(); if (!rack.startsWith("/")) { rack = "/" + rack; diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java index 2f81406..fadffa7 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java @@ -27,6 +27,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.util.ZkUtils; @@ -34,6 +36,7 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.awaitility.Awaitility; @@ -104,6 +107,32 @@ public class ZkBookieRackAffinityMappingTest { assertEquals(racks2.get(2), null); localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); + assertNull(racks1.get(2)); + } + + @Test + public void testInvalidRackName() throws InterruptedException, KeeperException { + String data = "{\"group1\": {\"" + BOOKIE1 + + "\": {\"rack\": \"/\", \"hostname\": \"bookie1.example.com\"}, \"" + BOOKIE2 + + "\": {\"rack\": \"\", \"hostname\": \"bookie2.example.com\"}}}"; + + ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Case1: ZKCache is given + ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping(); + ClientConfiguration bkClientConf1 = new ClientConfiguration(); + bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) { + }); + + mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + mapping1.setConf(bkClientConf1); + List<String> racks1 = mapping1 + .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName())); + + assertNull(racks1.get(0)); + assertNull(racks1.get(1)); + assertNull(racks1.get(2)); } @Test
