[
https://issues.apache.org/jira/browse/STORM-1257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117360#comment-15117360
]
ASF GitHub Bot commented on STORM-1257:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1047#discussion_r50847048
--- Diff: storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ZookeeperAuthInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Zookeeper {
+
+ private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root) {
+ return mkClient(conf, servers, port, root, new
DefaultWatcherCallBack());
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, Map authConf) {
+ return mkClient(conf, servers, port, "", new
DefaultWatcherCallBack(), authConf);
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, Map authConf) {
+ return mkClient(conf, servers, port, root, new
DefaultWatcherCallBack(), authConf);
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, final WatcherCallBack watcher, Map authConf)
{
+ CuratorFramework fk;
+ if (authConf != null) {
+ fk = Utils.newCurator(conf, servers, port, root, new
ZookeeperAuthInfo(authConf));
+ } else {
+ fk = Utils.newCurator(conf, servers, port, root);
+ }
+
+ fk.getCuratorListenable().addListener(new CuratorListener() {
+ @Override
+ public void eventReceived(CuratorFramework _fk, CuratorEvent
e) throws Exception {
+ if (e.getType().equals(CuratorEventType.WATCHED)) {
+ WatchedEvent event = e.getWatchedEvent();
+
+ watcher.execute(event.getState(), event.getType(),
event.getPath());
+ }
+
+ }
+ });
+ fk.start();
+ return fk;
+ }
+
+ /**
+ * connect ZK, register Watch/unhandle Watch
+ *
+ * @return
+ */
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, final WatcherCallBack watcher) {
+
+ return mkClient(conf, servers, port, root, watcher, null);
+ }
+
+ public static String createNode(CuratorFramework zk, String path,
byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls)
+ throws RuntimeException {
+
+ String ret = null;
+ try {
+ String npath = Utils.normalizePath(path);
+ ret = zk.create().withMode(mode).withACL(acls).forPath(npath,
data);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return ret;
+ }
+
+ public static String createNode(CuratorFramework zk, String path,
byte[] data, List<ACL> acls) throws RuntimeException {
+ return createNode(zk, path, data,
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
+ }
+
+ public static boolean existsNode(CuratorFramework zk, String path,
boolean watch) throws RuntimeException {
+ Stat stat = null;
+ try {
+ if (watch) {
+ stat =
zk.checkExists().watched().forPath(Utils.normalizePath(path));
+ } else {
+ stat = zk.checkExists().forPath(Utils.normalizePath(path));
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ return stat != null;
+ }
+
+ public static void deleteNode(CuratorFramework zk, String path) throws
RuntimeException {
+ try {
+ String npath = Utils.normalizePath(path);
+ if (existsNode(zk, npath, false)) {
+
zk.delete().deletingChildrenIfNeeded().forPath(Utils.normalizePath(path));
+ }
+
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("exception", e);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public static void mkdirs(CuratorFramework zk, String path, List<ACL>
acls) throws RuntimeException {
+
+ String npath = Utils.normalizePath(path);
+ if (npath.equals("/")) {
+ return;
+ }
+ if (existsNode(zk, npath, false)) {
+ return;
+ }
+ byte[] byteArray = new byte[1];
+ byteArray[0] = (byte) 7;
+ createNode(zk, npath, byteArray,
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
--- End diff --
In this place we do need to do something like the try-cause that was in the
original code. There is a race condition where multiple clients can be doing a
mkdirs. If that happens we need to ignore the NodeExistsException from
createNode, but createNode will have wrapped it in a RuntimeException. If you
want to avoid the wrapping/unwrapping and just catch the exception yourself
without unwrapping it you could do the following instead.
```
try {
zk.create().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(npath,
byteArray);
} catch (KeeperException.NodeExistsException ignored) {
// this can happen when multiple clients doing mkdir at same time
}
```
> port backtype.storm.zookeeper to java
> -------------------------------------
>
> Key: STORM-1257
> URL: https://issues.apache.org/jira/browse/STORM-1257
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: John Fang
> Labels: java-migration, jstorm-merger
>
> A wrapper around zookeeper/curator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)