[
https://issues.apache.org/jira/browse/STORM-1257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117388#comment-15117388
]
ASF GitHub Bot commented on STORM-1257:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1047#discussion_r50849882
--- Diff: storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ZookeeperAuthInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Zookeeper {
+
+ private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root) {
+ return mkClient(conf, servers, port, root, new
DefaultWatcherCallBack());
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, Map authConf) {
+ return mkClient(conf, servers, port, "", new
DefaultWatcherCallBack(), authConf);
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, Map authConf) {
+ return mkClient(conf, servers, port, root, new
DefaultWatcherCallBack(), authConf);
+ }
+
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, final WatcherCallBack watcher, Map authConf)
{
+ CuratorFramework fk;
+ if (authConf != null) {
+ fk = Utils.newCurator(conf, servers, port, root, new
ZookeeperAuthInfo(authConf));
+ } else {
+ fk = Utils.newCurator(conf, servers, port, root);
+ }
+
+ fk.getCuratorListenable().addListener(new CuratorListener() {
+ @Override
+ public void eventReceived(CuratorFramework _fk, CuratorEvent
e) throws Exception {
+ if (e.getType().equals(CuratorEventType.WATCHED)) {
+ WatchedEvent event = e.getWatchedEvent();
+
+ watcher.execute(event.getState(), event.getType(),
event.getPath());
+ }
+
+ }
+ });
+ fk.start();
+ return fk;
+ }
+
+ /**
+ * connect ZK, register Watch/unhandle Watch
+ *
+ * @return
+ */
+ public static CuratorFramework mkClient(Map conf, List<String>
servers, Object port, String root, final WatcherCallBack watcher) {
+
+ return mkClient(conf, servers, port, root, watcher, null);
+ }
+
+ public static String createNode(CuratorFramework zk, String path,
byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls)
+ throws RuntimeException {
+
+ String ret = null;
+ try {
+ String npath = Utils.normalizePath(path);
+ ret = zk.create().withMode(mode).withACL(acls).forPath(npath,
data);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ return ret;
+ }
+
+ public static String createNode(CuratorFramework zk, String path,
byte[] data, List<ACL> acls) throws RuntimeException {
+ return createNode(zk, path, data,
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
+ }
+
+ public static boolean existsNode(CuratorFramework zk, String path,
boolean watch) throws RuntimeException {
+ Stat stat = null;
+ try {
+ if (watch) {
+ stat =
zk.checkExists().watched().forPath(Utils.normalizePath(path));
+ } else {
+ stat = zk.checkExists().forPath(Utils.normalizePath(path));
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ return stat != null;
+ }
+
+ public static void deleteNode(CuratorFramework zk, String path) throws
RuntimeException {
+ try {
+ String npath = Utils.normalizePath(path);
+ if (existsNode(zk, npath, false)) {
+
zk.delete().deletingChildrenIfNeeded().forPath(Utils.normalizePath(path));
+ }
+
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("exception", e);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public static void mkdirs(CuratorFramework zk, String path, List<ACL>
acls) throws RuntimeException {
+
+ String npath = Utils.normalizePath(path);
+ if (npath.equals("/")) {
+ return;
+ }
+ if (existsNode(zk, npath, false)) {
+ return;
+ }
+ byte[] byteArray = new byte[1];
+ byteArray[0] = (byte) 7;
+ createNode(zk, npath, byteArray,
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
+
+ }
+
+ public static void syncPath(CuratorFramework zk, String path) throws
RuntimeException {
+ try {
+ zk.sync().forPath(Utils.normalizePath(path));
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public static void addListener(CuratorFramework zk,
ConnectionStateListener listener) {
+ zk.getConnectionStateListenable().addListener(listener);
+ }
+
+ public static byte[] getData(CuratorFramework zk, String path, boolean
watch) throws RuntimeException {
+
+ try {
+ String npath = Utils.normalizePath(path);
+ if (existsNode(zk, npath, watch)) {
+ if (watch) {
+ return zk.getData().watched().forPath(npath);
+ } else {
+ return zk.getData().forPath(npath);
+ }
+ }
+ } catch (KeeperException e) {
+ // this is fine b/c we still have a watch from the successful
exists call
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ return null;
+ }
+
+ public static Integer getVersion(CuratorFramework zk, String path,
boolean watch) throws Exception {
+ String npath = Utils.normalizePath(path);
+ Stat stat = null;
+ if (existsNode(zk, npath, watch)) {
+ if (watch) {
+ stat =
zk.checkExists().watched().forPath(Utils.normalizePath(path));
+ } else {
+ stat = zk.checkExists().forPath(Utils.normalizePath(path));
+ }
+ return Integer.valueOf(stat.getVersion());
+ }
+
+ return null;
+ }
+
+ public static List<String> getChildren(CuratorFramework zk, String
path, boolean watch) throws RuntimeException {
+
+ try {
+ String npath = Utils.normalizePath(path);
+ if (watch) {
+ return zk.getChildren().watched().forPath(npath);
+ } else {
+ return zk.getChildren().forPath(npath);
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ // Deletes the state inside the zookeeper for a key, for which the
+ // contents of the key starts with nimbus host port information
+ public static void deleteDodeBlobstore(CuratorFramework zk, String
parentPath, String hostPortInfo) throws RuntimeException {
+ String parentnPath = Utils.normalizePath(parentPath);
--- End diff --
`parentnPath` seems too close to `parentPath` for me to easily distinguish
between the two. Could you change it to `normalizedPatentPath`, or just
`nParentPath`.
> port backtype.storm.zookeeper to java
> -------------------------------------
>
> Key: STORM-1257
> URL: https://issues.apache.org/jira/browse/STORM-1257
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: John Fang
> Labels: java-migration, jstorm-merger
>
> A wrapper around zookeeper/curator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)