This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 16c5f42790 DRILL-8275: Prevent the JDBC Client from creating spurious
paths in Zookeeper (#2617)
16c5f42790 is described below
commit 16c5f4279007301325c121e236fa47f2ede5f9c2
Author: luoc <[email protected]>
AuthorDate: Mon Sep 5 13:48:11 2022 +0800
DRILL-8275: Prevent the JDBC Client from creating spurious paths in
Zookeeper (#2617)
---
.../drill/exec/coord/zk/ZKClusterCoordinator.java | 36 +++++++++++++-----
.../org/apache/drill/exec/server/Drillbit.java | 3 +-
.../drill/exec/client/DrillClientStateTest.java | 43 ++++++++++++++++++++++
3 files changed, 71 insertions(+), 11 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index aeb0dd0f20..108ca770c6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import java.util.regex.Pattern;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
+import org.apache.zookeeper.data.Stat;
import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -40,6 +42,7 @@ import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -78,15 +81,17 @@ public class ZKClusterCoordinator extends
ClusterCoordinator {
private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new
ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
private static final Pattern ZK_COMPLEX_STRING =
Pattern.compile("(^.*?)/(.*)/([^/]*)$");
- public ZKClusterCoordinator(DrillConfig config, String connect) {
- this(config, connect, new DefaultACLProvider());
+ public ZKClusterCoordinator(DrillConfig config, String connect) throws
Exception {
+ // As a Client, the namespace (aka zkRoot) should not be created.
+ this(config, connect, false, new DefaultACLProvider());
}
- public ZKClusterCoordinator(DrillConfig config, ACLProvider aclProvider) {
- this(config, null, aclProvider);
+ public ZKClusterCoordinator(DrillConfig config, ACLProvider aclProvider)
throws Exception {
+ // As a Drillbit, it's required to create the zkRoot.
+ this(config, null, true, aclProvider);
}
- public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider
aclProvider) {
+ public ZKClusterCoordinator(DrillConfig config, String connect, boolean
createNamespace, ACLProvider aclProvider) throws Exception {
connect = connect == null || connect.isEmpty() ?
config.getString(ExecConstants.ZK_CONNECTION) : connect;
String clusterId = config.getString(ExecConstants.SERVICE_NAME);
@@ -106,13 +111,23 @@ public class ZKClusterCoordinator extends
ClusterCoordinator {
RetryPolicy rp = new
RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
config.getInt(ExecConstants.ZK_RETRY_DELAY));
- curator = CuratorFrameworkFactory.builder()
- .namespace(zkRoot)
+
+ CuratorFrameworkFactory.Builder curatorBuilder =
CuratorFrameworkFactory.builder()
.connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
.retryPolicy(rp)
.connectString(connect)
- .aclProvider(aclProvider)
- .build();
+ .aclProvider(aclProvider);
+
+ if (!createNamespace) { // Using ZK style in client, the root path should
exist.
+ try (CuratorFramework client = curatorBuilder.build()) {
+ client.start();
+ Stat stat = client.checkExists().forPath(ZKPaths.PATH_SEPARATOR +
zkRoot);
+ Objects.requireNonNull(stat, "The root path does not exist in the
Zookeeper.");
+ }
+ }
+
+ curator = curatorBuilder.namespace(zkRoot).build();
+
curator.getConnectionStateListenable().addListener(new
InitialConnectionListener());
curator.start();
discovery = newDiscovery();
@@ -219,6 +234,7 @@ public class ZKClusterCoordinator extends
ClusterCoordinator {
* triggered. State information is used during planning and
* initial client connection phases.
*/
+ @Override
public RegistrationHandle update(RegistrationHandle handle, State state) {
ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
try {
@@ -348,7 +364,7 @@ public class ZKClusterCoordinator extends
ClusterCoordinator {
protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
return ServiceDiscoveryBuilder
.builder(DrillbitEndpoint.class)
- .basePath("/")
+ .basePath(ZKPaths.PATH_SEPARATOR)
.client(curator)
.serializer(DrillServiceInstanceHelper.SERIALIZER)
.build();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index a66862d013..8228550d67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.server;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.utils.ZKPaths;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -179,7 +180,7 @@ public class Drillbit implements AutoCloseable {
} else {
String clusterId = config.getString(ExecConstants.SERVICE_NAME);
String zkRoot = config.getString(ExecConstants.ZK_ROOT);
- String drillClusterPath = "/" + zkRoot + "/" + clusterId;
+ String drillClusterPath = ZKPaths.PATH_SEPARATOR + zkRoot +
ZKPaths.PATH_SEPARATOR + clusterId;
ACLProvider aclProvider = ZKACLProviderFactory.getACLProvider(config,
drillClusterPath, context);
coord = new ZKClusterCoordinator(config, aclProvider);
storeProvider = new PersistentStoreRegistry<>(this.coord,
config).newPStoreProvider();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java
new file mode 100644
index 0000000000..0a25e6e8d4
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.client;
+
+import org.apache.curator.utils.ZKPaths;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
+import org.junit.Test;
+
+public class DrillClientStateTest extends TestWithZookeeper {
+
+ @Test
+ public void testNotExistZkRoot() throws Exception {
+ // There is no drillbit startup, therefore the root path is not saved in
ZK.
+ thrownException.expect(NullPointerException.class);
+ thrownException.expectMessage("root path does not exist");
+ DrillConfig config = DrillConfig.create();
+ String connString = zkHelper.getConnectionString();
+ String zkRoot = config.getString(ExecConstants.ZK_ROOT); // does not exist
+ connString = connString + ZKPaths.PATH_SEPARATOR + zkRoot;
+ try (ZKClusterCoordinator coordinator = new ZKClusterCoordinator(config,
connString)) {
+ coordinator.start(10000);
+ }
+ }
+
+}