[
https://issues.apache.org/jira/browse/CURATOR-287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105893#comment-15105893
]
ASF GitHub Bot commented on CURATOR-287:
----------------------------------------
Github user cammckenzie commented on a diff in the pull request:
https://github.com/apache/curator/pull/123#discussion_r50050679
--- Diff:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
---
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent node is a node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and
design
+ * </p>
+ */
+public class PersistentNode implements Closeable
+{
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new
AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>>
createMethod;
+ private final AtomicReference<String> nodePath = new
AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new
AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new
AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == EventType.NodeDeleted )
+ {
+ createNode();
+ }
+ else if ( event.getType() == EventType.NodeDataChanged )
+ {
+ watchNode();
+ }
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new
BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception
+ {
+ if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ createNode();
+ }
+ else
+ {
+ boolean isEphemeral = event.getStat().getEphemeralOwner()
!= 0;
+ if ( isEphemeral != mode.isEphemeral() )
+ {
+ log.warn("Existing node ephemeral state doesn't match
requested state. Maybe the node was created outside of PersistentNode? " +
basePath);
+ }
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new
BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent
event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if
we're still initialising)
+ //Don't retry on other errors as the only recoverable cases
will be connection loss
+ //and the node not existing, both of which are already handled
by other watches.
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if
required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState
newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link
CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
+ public PersistentNode(CuratorFramework client, final CreateMode mode,
boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
+ this.client = Preconditions.checkNotNull(client, "client cannot be
null");
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be
null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data
cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client,
CuratorEvent event) throws Exception
+ {
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() ==
KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NOAUTH.intValue() )
+ {
+ log.warn("Client does not have authorisation to write
ephemeral node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+
client.setData().inBackground(setDataCallback).forPath(getActualPath(),
getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ?
client.create().creatingParentContainersIfNeeded().withProtection() :
client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent ephemeral node. An
attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT,
State.STARTED), "Already started");
+
+
client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link
#start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit)
throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not
started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+
client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that ephemeral node should set in ZK also writes the data
to the node
--- End diff --
Javadoc should not refer to ephemeral
> PersistentEphemeralNode should be generalized to accept all create modes
> ------------------------------------------------------------------------
>
> Key: CURATOR-287
> URL: https://issues.apache.org/jira/browse/CURATOR-287
> Project: Apache Curator
> Issue Type: New Feature
> Components: Recipes
> Affects Versions: 3.0.0, 2.9.1
> Reporter: Jordan Zimmerman
> Assignee: Jordan Zimmerman
> Priority: Minor
>
> With very little change, PersistentEphemeralNode could work with
> non-ephemeral nodes. There is a good use case for this: permanent nodes that
> must always exist with some data. It's actually a pain to do this manually
> with ZK.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)