This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit d3477bed870ba7778d55ed49ea14e30a09c88cdc Author: Hunter Lee <[email protected]> AuthorDate: Wed Jul 22 11:10:42 2020 -0700 Implement ZkRoutingDataReader In order to allow certain users to use ZK as the sole routing data source, we add ZkRoutingDataReader that transiently creates a ZK connection to read the routing data from the routing ZK. --- .../zookeeper/routing/ZkRoutingDataReader.java | 81 +++++++++++++++++++++- .../helix/zookeeper/constant/TestConstants.java | 16 +++-- .../zookeeper/routing/TestZkRoutingDataReader.java | 75 +++++++++++++++++++- 3 files changed, 164 insertions(+), 8 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java index 3db3b55..48c8604 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java @@ -1,4 +1,83 @@ package org.apache.helix.zookeeper.routing; -public class ZkRoutingDataReader { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.exception.MultiZkException; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; + + +/** + * Zk-based RoutingDataReader that establishes a ZK connection to the routing ZK to fetch routing + * data. + * The reading of routing data by nature should only be performed in cases of a Helix client + * initialization or routing data reset. That means we do not have to maintain an active ZK + * connection. To minimize the number of client-side ZK connections, ZkRoutingDataReader establishes + * a ZK session temporarily only to read from ZK afresh and closes sessions upon read completion. + */ +public class ZkRoutingDataReader implements RoutingDataReader { + + /** + * Returns a map form of metadata store routing data. + * The map fields stand for metadata store realm address (key), and a corresponding list of ZK + * path sharding keys (key). + * @param endpoint + * @return + */ + @Override + public Map<String, List<String>> getRawRoutingData(String endpoint) { + ZkClient zkClient = + new ZkClient.Builder().setZkServer(endpoint).setZkSerializer(new ZNRecordSerializer()) + .build(); + + Map<String, List<String>> routingData = new HashMap<>(); + List<String> allRealmAddresses; + try { + allRealmAddresses = zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH); + } catch (ZkNoNodeException e) { + throw new MultiZkException( + "Routing data directory ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH + + " does not exist. Routing ZooKeeper address: " + endpoint); + } + if (allRealmAddresses != null) { + for (String realmAddress : allRealmAddresses) { + ZNRecord record = zkClient + .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realmAddress, true); + if (record != null) { + List<String> shardingKeys = + record.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY); + routingData + .put(realmAddress, shardingKeys != null ? shardingKeys : Collections.emptyList()); + } + } + } + + zkClient.close(); + return routingData; + } } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java index a217245..d6c6713 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java @@ -20,6 +20,7 @@ package org.apache.helix.zookeeper.constant; */ import java.util.Collection; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableList; @@ -34,12 +35,15 @@ public class TestConstants { public static final String ZK_PREFIX = "localhost:"; public static final int ZK_START_PORT = 2127; + public static final List<String> TEST_KEY_LIST_1 = + ImmutableList.of("/sharding-key-0", "/sharding-key-1", "/sharding-key-2"); + public static final List<String> TEST_KEY_LIST_2 = + ImmutableList.of("/sharding-key-3", "/sharding-key-4", "/sharding-key-5"); + public static final List<String> TEST_KEY_LIST_3 = + ImmutableList.of("/sharding-key-6", "/sharding-key-7", "/sharding-key-8"); + // Based on the ZK hostname constants, construct a set of fake routing data mappings public static final Map<String, Collection<String>> FAKE_ROUTING_DATA = ImmutableMap - .of(ZK_PREFIX + ZK_START_PORT, - ImmutableList.of("/sharding-key-0", "/sharding-key-1", "/sharding-key-2"), - ZK_PREFIX + (ZK_START_PORT + 1), - ImmutableList.of("/sharding-key-3", "/sharding-key-4", "/sharding-key-5"), - ZK_PREFIX + (ZK_START_PORT + 2), - ImmutableList.of("/sharding-key-6", "/sharding-key-7", "/sharding-key-8")); + .of(ZK_PREFIX + ZK_START_PORT, TEST_KEY_LIST_1, ZK_PREFIX + (ZK_START_PORT + 1), + TEST_KEY_LIST_2, ZK_PREFIX + (ZK_START_PORT + 2), TEST_KEY_LIST_3); } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java index 68f8df4..cc1936b 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java @@ -1,4 +1,77 @@ package org.apache.helix.zookeeper.routing; -public class TestZkRoutingDataReader { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; + +import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; +import org.apache.helix.zookeeper.constant.TestConstants; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.ZkTestBase; +import org.apache.helix.zookeeper.zkclient.ZkClient; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.apache.zookeeper.CreateMode; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestZkRoutingDataReader extends ZkTestBase { + private static final String ROUTING_ZK_ADDR = "localhost:2358"; + private ZkServer _routingZk; + + @BeforeClass + public void beforeClass() { + // Start a separate ZK for isolation + _routingZk = startZkServer(ROUTING_ZK_ADDR); + + // Create ZK realm routing data ZNRecord + ZNRecord znRecord = new ZNRecord(ROUTING_ZK_ADDR); + znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, + TestConstants.TEST_KEY_LIST_1); + + // Write raw routing data to ZK + ZkClient zkClient = _routingZk.getZkClient(); + zkClient.setZkSerializer(new ZNRecordSerializer()); + zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, CreateMode.PERSISTENT); + zkClient + .create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + ROUTING_ZK_ADDR, znRecord, + CreateMode.PERSISTENT); + } + + @AfterClass + public void afterClass() { + _routingZk.shutdown(); + } + + @Test + public void testGetRawRoutingData() { + Map<String, List<String>> rawRoutingData = + new ZkRoutingDataReader().getRawRoutingData(ROUTING_ZK_ADDR); + + // Check that the returned content matches up with what we expect (1 realm, 3 keys) + Assert.assertEquals(rawRoutingData.size(), 1); + Assert.assertTrue(rawRoutingData.containsKey(ROUTING_ZK_ADDR)); + Assert.assertEquals(rawRoutingData.get(ROUTING_ZK_ADDR), TestConstants.TEST_KEY_LIST_1); + } }
