zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26ffa0db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26ffa0db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26ffa0db

Branch: refs/heads/ignite-zk
Commit: 26ffa0dbbfe2276f6070bb22844cb6102738ee23
Parents: 0b78f31
Author: sboikov <[email protected]>
Authored: Fri Dec 29 12:27:01 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 29 12:27:01 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZkAliveNodeData.java  |    3 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |    2 +-
 .../discovery/zk/internal/ZkRuntimeState.java   |   19 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  154 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 4247 -----------------
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 4311 ++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java  |    6 +-
 7 files changed, 4459 insertions(+), 4283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
index 45f453f..9574325 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
@@ -30,6 +30,9 @@ public class ZkAliveNodeData implements Serializable {
     /** */
     long lastProcEvt = -1;
 
+    /** */
+    transient boolean needUpdate;
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ZkAliveNodeData.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 4ba6de2..44b247c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -27,7 +27,7 @@ class ZkIgnitePaths {
     static final String PATH_SEPARATOR = "/";
 
     /** */
-    static final byte CLIENT_NODE_FLAG_MASK = 0x01;
+    private static final byte CLIENT_NODE_FLAG_MASK = 0x01;
 
     /** */
     private static final int UUID_LEN = 36;

http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index e61e2b2..774c2a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -49,12 +49,6 @@ class ZkRuntimeState {
     int joinDataPartCnt;
 
     /** */
-    ZkTimeoutObject joinErrTimeoutObj;
-
-    /** */
-    ZkTimeoutObject joinTimeoutObj;
-
-    /** */
     long gridStartTime;
 
     /** */
@@ -70,7 +64,7 @@ class ZkRuntimeState {
     String locNodeZkPath;
 
     /** */
-    ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+    final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
 
     /** */
     int procEvtCnt;
@@ -81,6 +75,17 @@ class ZkRuntimeState {
     /** */
     List<ClusterNode> commErrProcNodes;
 
+    /** Timeout callback registering watcher for join error
+     * (set this watcher after timeout as a minor optimization).
+     */
+    ZkTimeoutObject joinErrTo;
+
+    /** Timeout callback set to wait for join timeout. */
+    ZkTimeoutObject joinTo;
+
+    /** Timeout callback to update processed events counter. */
+    ZkTimeoutObject procEvtsUpdateTo;
+
     /**
      * @param prevJoined {@code True} if joined topology before reconnect 
attempt.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ad7da00..8f717c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -106,6 +106,9 @@ public class ZookeeperDiscoveryImpl {
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
 
     /** */
+    static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT";
+
+    /** */
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS";
 
     /** */
@@ -991,10 +994,10 @@ public class ZookeeperDiscoveryImpl {
              */
             CheckJoinErrorWatcher joinErrorWatcher = new 
CheckJoinErrorWatcher(5000, joinDataPath, rtState);
 
-            rtState.joinErrTimeoutObj = joinErrorWatcher.timeoutObj;
+            rtState.joinErrTo = joinErrorWatcher.timeoutObj;
 
             if (locNode.isClient() && spi.getJoinTimeout() > 0) {
-                ZkTimeoutObject joinTimeoutObj = prevState != null ? 
prevState.joinTimeoutObj : null;
+                ZkTimeoutObject joinTimeoutObj = prevState != null ? 
prevState.joinTo : null;
 
                 if (joinTimeoutObj == null) {
                     joinTimeoutObj = new 
JoinTimeoutObject(spi.getJoinTimeout());
@@ -1002,7 +1005,7 @@ public class ZookeeperDiscoveryImpl {
                     spi.getSpiContext().addTimeoutObject(joinTimeoutObj);
                 }
 
-                rtState.joinTimeoutObj = joinTimeoutObj;
+                rtState.joinTo = joinTimeoutObj;
             }
 
             if (!locNode.isClient())
@@ -1010,7 +1013,7 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, 
rtState.watcher);
 
-            spi.getSpiContext().addTimeoutObject(rtState.joinErrTimeoutObj);
+            spi.getSpiContext().addTimeoutObject(rtState.joinErrTo);
         }
         catch (IgniteCheckedException | ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
@@ -1031,7 +1034,7 @@ public class ZookeeperDiscoveryImpl {
         try {
             SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
 
-            // Note: exception message test is checked in tests.
+            // Note: exception message is checked in tests.
             if (subj == null)
                 throw new IgniteSpiException("Authentication failed for local 
node.");
 
@@ -1109,6 +1112,33 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
+    private class UpdateProcessedEventsTimeoutObject extends ZkTimeoutObject {
+        /** */
+        private final ZkRuntimeState rtState;
+
+        /**
+         * @param rtState Runtime state.
+         * @param timeout Timeout.
+         */
+        UpdateProcessedEventsTimeoutObject(ZkRuntimeState rtState, long 
timeout) {
+            super(timeout);
+
+            this.rtState = rtState;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            runInWorkerThread(new ZkRunnable(rtState, 
ZookeeperDiscoveryImpl.this) {
+                @Override protected void run0() throws Exception {
+                    updateProcessedEventsOnTimeout(rtState, 
UpdateProcessedEventsTimeoutObject.this);
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
     private class JoinTimeoutObject extends ZkTimeoutObject {
         /**
          * @param timeout Timeout.
@@ -1138,7 +1168,7 @@ public class ZookeeperDiscoveryImpl {
                         "are no alive server nodes (consider increasing 
'joinTimeout' configuration  property) [" +
                         "joinTimeout=" + spi.getJoinTimeout() + ']');
 
-                    // Note: exception message test is checked in tests.
+                    // Note: exception message is checked in tests.
                     onSegmented(new IgniteSpiException("Failed to connect to 
cluster within configured timeout"));
                 }
             });
@@ -1801,6 +1831,7 @@ public class ZookeeperDiscoveryImpl {
             U.error(log, "Failed to include node in cluster, node with the 
same ID already exists [joiningNode=" + node +
                 ", existingNode=" + node0 + ']');
 
+            // Note: exception message is checked in tests.
             return new ZkNodeValidateResult("Node with the same ID already 
exists: " + node0);
         }
 
@@ -2055,7 +2086,7 @@ public class ZookeeperDiscoveryImpl {
 
         assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId;
 
-        spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj);
+        spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
 
         cleanupPreviousClusterData();
 
@@ -2423,6 +2454,7 @@ public class ZookeeperDiscoveryImpl {
 
         ZookeeperClient zkClient = rtState.zkClient;
 
+        boolean evtProcessed = false;
         boolean updateNodeInfo = false;
 
         for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
@@ -2441,6 +2473,8 @@ public class ZookeeperDiscoveryImpl {
                     assert locNode.id().equals(joinedId);
 
                     processLocalJoin(evtsData, evtData0);
+
+                    evtProcessed = true;
                 }
             }
             else {
@@ -2533,6 +2567,8 @@ public class ZookeeperDiscoveryImpl {
                     default:
                         assert false : "Invalid event: " + evtData;
                 }
+
+                evtProcessed = true;
             }
 
             if (rtState.joined) {
@@ -2547,26 +2583,90 @@ public class ZookeeperDiscoveryImpl {
 
         if (rtState.crd)
             handleProcessedEvents("procEvt");
-        else if (updateNodeInfo) {
-            assert rtState.locNodeZkPath != null;
+        else
+            onEventProcessed(rtState, updateNodeInfo, evtProcessed);
 
-            if (log.isDebugEnabled())
-                log.debug("Update processed events: " + 
rtState.locNodeInfo.lastProcEvt);
+        ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
 
-            try {
-                zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
+        if (commErrFut != null)
+            commErrFut.onTopologyChange(rtState.top); // This can add new 
event, notify out of event process loop.
+    }
+
+    /**
+     * @param rtState Runtime state.
+     * @param updateNodeInfo {@code True} if need update processed events 
without delay.
+     * @param evtProcessed {@code True} if new event was processed.
+     * @throws Exception If failed.
+     */
+    private void onEventProcessed(ZkRuntimeState rtState,
+        boolean updateNodeInfo,
+        boolean evtProcessed) throws Exception
+    {
+        synchronized (stateMux) {
+            if (updateNodeInfo) {
+                assert rtState.locNodeZkPath != null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Update processed events: " + 
rtState.locNodeInfo.lastProcEvt);
+
+                updateProcessedEvents(rtState);
+
+                if (rtState.procEvtsUpdateTo != null) {
+                    
spi.getSpiContext().removeTimeoutObject(rtState.procEvtsUpdateTo);
+
+                    rtState.procEvtsUpdateTo = null;
+                }
+            }
+            else if (evtProcessed) {
+                rtState.locNodeInfo.needUpdate = true;
+
+                if (rtState.procEvtsUpdateTo == null) {
+                    long updateTimeout = 
IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT,
+                        60_000);
+
+                    if (updateTimeout > 0) {
+                        rtState.procEvtsUpdateTo = new 
UpdateProcessedEventsTimeoutObject(rtState, updateTimeout);
+
+                        
spi.getSpiContext().addTimeoutObject(rtState.procEvtsUpdateTo);
+                    }
+                }
             }
-            catch (KeeperException.NoNodeException e) {
-                // Possible if node is forcible failed.
+        }
+    }
+
+    /**
+     * @param rtState Runtime state.
+     * @param procEvtsUpdateTo Timeout object.
+     * @throws Exception If failed.
+     */
+    private void updateProcessedEventsOnTimeout(ZkRuntimeState rtState, 
ZkTimeoutObject procEvtsUpdateTo)
+        throws Exception
+    {
+        synchronized (stateMux) {
+            if (rtState.procEvtsUpdateTo == procEvtsUpdateTo && 
rtState.locNodeInfo.needUpdate) {
                 if (log.isDebugEnabled())
-                    log.debug("Failed to update processed events, no node: " + 
rtState.locNodeInfo.lastProcEvt);
+                    log.debug("Update processed events on timeout: " + 
rtState.locNodeInfo.lastProcEvt);
+
+                updateProcessedEvents(rtState);
             }
         }
+    }
 
-        ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
+    /**
+     * @param rtState Runtime state.
+     * @throws Exception If failed.
+     */
+    private void updateProcessedEvents(ZkRuntimeState rtState) throws 
Exception {
+        try {
+            rtState.zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
 
-        if (commErrFut != null)
-            commErrFut.onTopologyChange(rtState.top); // This can add new 
event, notify out of event process loop.
+            rtState.locNodeInfo.needUpdate = false;
+        }
+        catch (KeeperException.NoNodeException e) {
+            // Possible if node is forcible failed.
+            if (log.isDebugEnabled())
+                log.debug("Failed to update processed events, no node: " + 
rtState.locNodeInfo.lastProcEvt);
+        }
     }
 
     /**
@@ -2597,14 +2697,14 @@ public class ZookeeperDiscoveryImpl {
             if (connState == ConnectionState.STOPPED)
                 return;
 
-            if (rtState.joinTimeoutObj != null) {
-                
spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj);
+            if (rtState.joinTo != null) {
+                spi.getSpiContext().removeTimeoutObject(rtState.joinTo);
 
-                rtState.joinTimeoutObj.cancelled = true;
-                rtState.joinTimeoutObj = null;
+                rtState.joinTo.cancelled = true;
+                rtState.joinTo = null;
             }
 
-            spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj);
+            spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
 
             if (log.isInfoEnabled())
                 log.info("Local join event data: " + evtData + ']');
@@ -3607,7 +3707,11 @@ public class ZookeeperDiscoveryImpl {
         return zip(marsh.marshal(obj));
     }
 
-    static byte[] zip(byte[] bytes) {
+    /**
+     * @param bytes Bytes to compress.
+     * @return Zip-compressed bytes.
+     */
+    private static byte[] zip(byte[] bytes) {
         Deflater deflater = new Deflater();
 
         deflater.setInput(bytes);

Reply via email to