Repository: ignite
Updated Branches:
  refs/heads/master 86465f585 -> 7402ea110


Added Affinity topology version and Pending exchanges to Visor data collector 
task.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7402ea11
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7402ea11
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7402ea11

Branch: refs/heads/master
Commit: 7402ea11051eb328702ca85d12477accfd22337c
Parents: 86465f5
Author: Alexey Kuznetsov <akuznet...@apache.org>
Authored: Fri May 19 14:18:23 2017 +0700
Committer: Alexey Kuznetsov <akuznet...@apache.org>
Committed: Fri May 19 14:18:23 2017 +0700

----------------------------------------------------------------------
 .../node/VisorAffinityTopologyVersion.java      | 87 ++++++++++++++++++++
 .../visor/node/VisorNodeDataCollectorJob.java   |  6 ++
 .../node/VisorNodeDataCollectorJobResult.java   | 38 +++++++++
 .../visor/node/VisorNodeDataCollectorTask.java  |  4 +
 .../node/VisorNodeDataCollectorTaskResult.java  | 28 ++++++-
 5 files changed, 162 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7402ea11/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorAffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorAffinityTopologyVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorAffinityTopologyVersion.java
new file mode 100644
index 0000000..83b2fb1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorAffinityTopologyVersion.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+
+/**
+ * Data transfer object for {@link AffinityTopologyVersion}
+ */
+public class VisorAffinityTopologyVersion extends VisorDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long topVer;
+
+    /** */
+    private int minorTopVer;
+
+    /**
+     * Default constructor.
+     */
+    public VisorAffinityTopologyVersion() {
+        // No-op.
+    }
+
+    /**
+     * Create data transfer object for affinity topology version.
+     *
+     * @param affTopVer Affinity topology version.
+     */
+    public VisorAffinityTopologyVersion(AffinityTopologyVersion affTopVer) {
+        topVer = affTopVer.topologyVersion();
+        minorTopVer = affTopVer.minorTopologyVersion();
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public long getTopologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Minor topology version.
+     */
+    public int getMinorTopologyVersion() {
+        return minorTopVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        out.writeLong(topVer);
+        out.writeInt(minorTopVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        topVer = in.readLong();
+        minorTopVer = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorAffinityTopologyVersion.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7402ea11/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 125e219..573cc83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
 import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
@@ -242,6 +243,11 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
         VisorNodeDataCollectorTaskArg arg) {
         res.setGridName(ignite.name());
 
+        GridCachePartitionExchangeManager<Object, Object> exchange = 
ignite.context().cache().context().exchange();
+
+        res.setReadyAffinityVersion(new 
VisorAffinityTopologyVersion(exchange.readyAffinityVersion()));
+        res.setHasPendingExchange(exchange.hasPendingExchange());
+
         res.setTopologyVersion(ignite.cluster().topologyVersion());
 
         long start0 = U.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7402ea11/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
index 41f9468..761d0ff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
@@ -70,6 +70,12 @@ public class VisorNodeDataCollectorJobResult extends 
VisorDataTransferObject {
     /** Errors count. */
     private long errCnt;
 
+    /** Topology version of latest completed partition exchange. */
+    private VisorAffinityTopologyVersion readyTopVer;
+
+    /** Whether pending exchange future exists. */
+    private boolean hasPendingExchange;
+
     /**
      * Default constructor.
      */
@@ -203,6 +209,34 @@ public class VisorNodeDataCollectorJobResult extends 
VisorDataTransferObject {
         this.errCnt = errCnt;
     }
 
+    /**
+     * @return Topology version of latest completed partition exchange.
+     */
+    public VisorAffinityTopologyVersion getReadyAffinityVersion() {
+        return readyTopVer;
+    }
+
+    /**
+     * @param readyTopVer Topology version of latest completed partition 
exchange.
+     */
+    public void setReadyAffinityVersion(VisorAffinityTopologyVersion 
readyTopVer) {
+        this.readyTopVer = readyTopVer;
+    }
+
+    /**
+     * @return Whether pending exchange future exists.
+     */
+    public boolean isHasPendingExchange() {
+        return hasPendingExchange;
+    }
+
+    /**
+     * @param hasPendingExchange Whether pending exchange future exists.
+     */
+    public void setHasPendingExchange(boolean hasPendingExchange) {
+        this.hasPendingExchange = hasPendingExchange;
+    }
+
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
         U.writeString(out, gridName);
@@ -216,6 +250,8 @@ public class VisorNodeDataCollectorJobResult extends 
VisorDataTransferObject {
         U.writeCollection(out, igfsEndpoints);
         out.writeObject(igfssEx);
         out.writeLong(errCnt);
+        out.writeObject(readyTopVer);
+        out.writeBoolean(hasPendingExchange);
     }
 
     /** {@inheritDoc} */
@@ -231,6 +267,8 @@ public class VisorNodeDataCollectorJobResult extends 
VisorDataTransferObject {
         igfsEndpoints = U.readList(in);
         igfssEx = (Throwable)in.readObject();
         errCnt = in.readLong();
+        readyTopVer = (VisorAffinityTopologyVersion)in.readObject();
+        hasPendingExchange = in.readBoolean();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7402ea11/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
index a317ffe..2790dec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
@@ -113,5 +113,9 @@ public class VisorNodeDataCollectorTask extends 
VisorMultiNodeTask<VisorNodeData
 
         if (jobRes.getIgfssEx() != null)
             taskRes.getIgfssEx().put(nid, new 
VisorExceptionWrapper(jobRes.getIgfssEx()));
+
+        taskRes.getReadyAffinityVersions().put(nid, 
jobRes.getReadyAffinityVersion());
+
+        taskRes.getPendingExchanges().put(nid, jobRes.isHasPendingExchange());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7402ea11/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
index 669cc80..8d1fe8e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
@@ -81,6 +81,12 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
     /** Exceptions caught during collecting IGFS from nodes. */
     private Map<UUID, VisorExceptionWrapper> igfssEx = new HashMap<>();
 
+    /** Topology version of latest completed partition exchange from nodes. */
+    private Map<UUID, VisorAffinityTopologyVersion> readyTopVers = new 
HashMap<>();
+
+    /** Whether pending exchange future exists from nodes. */
+    private Map<UUID, Boolean> pendingExchanges = new HashMap<>();
+
     /**
      * Default constructor.
      */
@@ -103,7 +109,9 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
             cachesEx.isEmpty() &&
             igfss.isEmpty() &&
             igfsEndpoints.isEmpty() &&
-            igfssEx.isEmpty();
+            igfssEx.isEmpty() &&
+            readyTopVers.isEmpty() &&
+            pendingExchanges.isEmpty();
     }
 
     /**
@@ -204,6 +212,20 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
         return errCnts;
     }
 
+    /**
+     * @return Topology version of latest completed partition exchange from 
nodes.
+     */
+    public Map<UUID, VisorAffinityTopologyVersion> getReadyAffinityVersions() {
+        return readyTopVers;
+    }
+
+    /**
+     * @return Whether pending exchange future exists from nodes.
+     */
+    public Map<UUID, Boolean> getPendingExchanges() {
+        return pendingExchanges;
+    }
+
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
         out.writeBoolean(active);
@@ -219,6 +241,8 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
         U.writeMap(out, igfss);
         U.writeMap(out, igfsEndpoints);
         U.writeMap(out, igfssEx);
+        U.writeMap(out, readyTopVers);
+        U.writeMap(out, pendingExchanges);
     }
 
     /** {@inheritDoc} */
@@ -236,6 +260,8 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
         igfss = U.readMap(in);
         igfsEndpoints = U.readMap(in);
         igfssEx = U.readMap(in);
+        readyTopVers = U.readMap(in);
+        pendingExchanges = U.readMap(in);
     }
 
     /** {@inheritDoc} */

Reply via email to