IGNITE-10564 Added tasks to collect rebalance metrics. Co-authored-by: Alexey Kuznetsov <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ececcbcd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ececcbcd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ececcbcd Branch: refs/heads/ignite-10189 Commit: ececcbcd1883a2cda5a2eb12e0f0deffddf6b829 Parents: 4c56adc Author: Vasiliy Sisko <[email protected]> Authored: Thu Dec 6 11:30:08 2018 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Thu Dec 6 11:30:08 2018 +0700 ---------------------------------------------------------------------- .../VisorCacheRebalanceCollectorJobResult.java | 91 +++++++++ .../node/VisorCacheRebalanceCollectorTask.java | 194 +++++++++++++++++++ .../VisorCacheRebalanceCollectorTaskArg.java | 54 ++++++ .../VisorCacheRebalanceCollectorTaskResult.java | 92 +++++++++ .../visor/node/VisorNodeBaselineStatus.java | 45 +++++ .../visor/node/VisorNodeDataCollectorJob.java | 74 ++++--- .../node/VisorNodeDataCollectorTaskResult.java | 3 +- .../internal/visor/util/VisorTaskUtils.java | 43 ++++ 8 files changed, 564 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java new file mode 100644 index 0000000..5bd818d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Result object for cache rebalance job. + */ +public class VisorCacheRebalanceCollectorJobResult extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Rebalance percent. */ + private double rebalance; + + /** Node baseline state. */ + private VisorNodeBaselineStatus baseline; + + /** + * Default constructor. + */ + public VisorCacheRebalanceCollectorJobResult() { + // No-op. + } + + /** + * @return Rebalance progress. + */ + public double getRebalance() { + return rebalance; + } + + /** + * @param rebalance Rebalance progress. + */ + public void setRebalance(double rebalance) { + this.rebalance = rebalance; + } + + /** + * @return Node baseline status. + */ + public VisorNodeBaselineStatus getBaseline() { + return baseline; + } + + /** + * @param baseline Node baseline status. + */ + public void setBaseline(VisorNodeBaselineStatus baseline) { + this.baseline = baseline; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + out.writeDouble(rebalance); + U.writeEnum(out, baseline); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + rebalance = in.readDouble(); + baseline = VisorNodeBaselineStatus.fromOrdinal(in.readByte()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheRebalanceCollectorJobResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java new file mode 100644 index 0000000..eda9f94 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import java.util.Collection; +import java.util.List; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.BASELINE_NOT_AVAILABLE; +import static org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.NODE_IN_BASELINE; +import static org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.NODE_NOT_IN_BASELINE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.MINIMAL_REBALANCE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.NOTHING_TO_REBALANCE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_COMPLETE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_NOT_AVAILABLE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.isProxyCache; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.isRestartingCache; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log; + +/** + * Collects topology rebalance metrics. + */ +@GridInternal +public class VisorCacheRebalanceCollectorTask extends VisorMultiNodeTask<VisorCacheRebalanceCollectorTaskArg, + VisorCacheRebalanceCollectorTaskResult, VisorCacheRebalanceCollectorJobResult> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCacheRebalanceCollectorJob job(VisorCacheRebalanceCollectorTaskArg arg) { + return new VisorCacheRebalanceCollectorJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected VisorCacheRebalanceCollectorTaskResult reduce0(List<ComputeJobResult> results) { + return reduce(new VisorCacheRebalanceCollectorTaskResult(), results); + } + + /** + * @param taskRes Task result. + * @param results Results. + * @return Topology rebalance metrics collector task result. + */ + protected VisorCacheRebalanceCollectorTaskResult reduce( + VisorCacheRebalanceCollectorTaskResult taskRes, + List<ComputeJobResult> results + ) { + for (ComputeJobResult res : results) { + VisorCacheRebalanceCollectorJobResult jobRes = res.getData(); + + if (jobRes != null) { + if (res.getException() == null) + taskRes.getRebalance().put(res.getNode().id(), jobRes.getRebalance()); + + taskRes.getBaseline().put(res.getNode().id(), jobRes.getBaseline()); + } + } + + return taskRes; + } + + /** + * Job that collects rebalance metrics. + */ + private static class VisorCacheRebalanceCollectorJob extends VisorJob<VisorCacheRebalanceCollectorTaskArg, VisorCacheRebalanceCollectorJobResult> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job with given argument. + * + * @param arg Job argument. + * @param debug Debug flag. + */ + private VisorCacheRebalanceCollectorJob(VisorCacheRebalanceCollectorTaskArg arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected VisorCacheRebalanceCollectorJobResult run(VisorCacheRebalanceCollectorTaskArg arg) { + VisorCacheRebalanceCollectorJobResult res = new VisorCacheRebalanceCollectorJobResult(); + + long start0 = U.currentTimeMillis(); + + try { + int partitions = 0; + double total = 0; + double ready = 0; + + GridCacheProcessor cacheProc = ignite.context().cache(); + + boolean rebalanceInProgress = false; + + for (CacheGroupContext grp: cacheProc.cacheGroups()) { + String cacheName = grp.config().getName(); + + if (isProxyCache(ignite, cacheName) || isRestartingCache(ignite, cacheName)) + continue; + + try { + GridCacheAdapter ca = cacheProc.internalCache(cacheName); + + if (ca == null || !ca.context().started()) + continue; + + CacheMetrics cm = ca.localMetrics(); + + partitions += cm.getTotalPartitionsCount(); + + long keysTotal = cm.getEstimatedRebalancingKeys(); + long keysReady = cm.getRebalancedKeys(); + + if (keysReady >= keysTotal) + keysReady = Math.max(keysTotal - 1, 0); + + total += keysTotal; + ready += keysReady; + + if (cm.getRebalancingPartitionsCount() > 0) + rebalanceInProgress = true; + } + catch(IllegalStateException | IllegalArgumentException e) { + if (debug && ignite.log() != null) + ignite.log().error("Ignored cache group: " + grp.cacheOrGroupName(), e); + } + } + + if (partitions == 0) + res.setRebalance(NOTHING_TO_REBALANCE); + else if (total == 0 && rebalanceInProgress) + res.setRebalance(MINIMAL_REBALANCE); + else + res.setRebalance(total > 0 ? Math.max(ready / total, MINIMAL_REBALANCE) : REBALANCE_COMPLETE); + } + catch (Exception e) { + res.setRebalance(REBALANCE_NOT_AVAILABLE); + + ignite.log().error("Failed to collect rebalance metrics", e); + } + + if (GridCacheUtils.isPersistenceEnabled(ignite.configuration())) { + IgniteClusterEx cluster = ignite.cluster(); + + Object consistentId = ignite.localNode().consistentId(); + + Collection<? extends BaselineNode> baseline = cluster.currentBaselineTopology(); + + boolean inBaseline = baseline.stream().anyMatch(n -> consistentId.equals(n.consistentId())); + + res.setBaseline(inBaseline ? NODE_IN_BASELINE : NODE_NOT_IN_BASELINE); + } + else + res.setBaseline(BASELINE_NOT_AVAILABLE); + + if (debug) + log(ignite.log(), "Collected rebalance metrics", getClass(), start0); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheRebalanceCollectorJob.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java new file mode 100644 index 0000000..d97fd50 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.visor.VisorDataTransferObject; + +/** + * Argument for {@link VisorCacheRebalanceCollectorTask} task. + */ +public class VisorCacheRebalanceCollectorTaskArg extends VisorDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default constructor. + */ + public VisorCacheRebalanceCollectorTaskArg() { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheRebalanceCollectorTaskArg.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java new file mode 100644 index 0000000..1305cd2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Result object for {@link VisorCacheRebalanceCollectorTask} task. + */ +public class VisorCacheRebalanceCollectorTaskResult extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Rebalance state on nodes. */ + private Map<UUID, Double> rebalance = new HashMap<>(); + + /** Nodes baseline status. */ + private Map<UUID, VisorNodeBaselineStatus> baseline = new HashMap<>(); + + /** + * Default constructor. + */ + public VisorCacheRebalanceCollectorTaskResult() { + // No-op. + } + + /** + * @return Rebalance on nodes. + */ + public Map<UUID, Double> getRebalance() { + return rebalance; + } + + /** + * @return Baseline. + */ + public Map<UUID, VisorNodeBaselineStatus> getBaseline() { + return baseline; + } + + /** + * Add specified results. + * + * @param res Results to add. + */ + public void add(VisorCacheRebalanceCollectorTaskResult res) { + assert res != null; + + rebalance.putAll(res.getRebalance()); + baseline.putAll(res.getBaseline()); + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeMap(out, rebalance); + U.writeMap(out, baseline); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + rebalance = U.readMap(in); + baseline = U.readMap(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheRebalanceCollectorTaskResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java new file mode 100644 index 0000000..ea90be3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.node; + +import org.jetbrains.annotations.Nullable; + +/** + * Node baseline status. + */ +public enum VisorNodeBaselineStatus { + /** */ + NODE_IN_BASELINE, + /** */ + NODE_NOT_IN_BASELINE, + /** */ + BASELINE_NOT_AVAILABLE; + + /** Enumerated values. */ + private static final VisorNodeBaselineStatus[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value or {@code null} if ordinal out of range. + */ + @Nullable public static VisorNodeBaselineStatus fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index 9025ed0..9a7d2b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -28,8 +28,9 @@ import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; @@ -50,9 +51,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isIgfsC import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache; import static org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.COMPUTE_MONITORING_HOLDER_KEY; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.EVT_MAPPER; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.MINIMAL_REBALANCE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.NOTHING_TO_REBALANCE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_COMPLETE; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_NOT_AVAILABLE; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.VISOR_TASK_EVTS; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.checkExplicitTaskMonitoring; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.collectEvents; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.isProxyCache; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.isRestartingCache; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log; /** @@ -141,18 +148,6 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } /** - * @param cacheName Cache name to check. - * @return {@code true} if cache on local node is not a data cache or near cache disabled. - */ - private boolean proxyCache(String cacheName) { - GridDiscoveryManager discovery = ignite.context().discovery(); - - ClusterNode locNode = ignite.localNode(); - - return !(discovery.cacheAffinityNode(locNode, cacheName) || discovery.cacheNearNode(locNode, cacheName)); - } - - /** * Collect memory metrics. * * @param res Job result. @@ -194,38 +189,51 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa List<VisorCache> resCaches = res.getCaches(); - for (String cacheName : cacheProc.cacheNames()) { - if (proxyCache(cacheName)) - continue; + boolean rebalanceInProgress = false; - boolean sysCache = isSystemCache(cacheName); + for (CacheGroupContext grp : cacheProc.cacheGroups()) { + boolean first = true; - if (arg.getSystemCaches() || !(sysCache || isIgfsCache(cfg, cacheName))) { + for (GridCacheContext cache : grp.caches()) { long start0 = U.currentTimeMillis(); + String cacheName = cache.name(); + try { + if (isProxyCache(ignite, cacheName) || isRestartingCache(ignite, cacheName)) + continue; + GridCacheAdapter ca = cacheProc.internalCache(cacheName); if (ca == null || !ca.context().started()) continue; - CacheMetrics cm = ca.localMetrics(); + if (first) { + CacheMetrics cm = ca.localMetrics(); + + partitions += cm.getTotalPartitionsCount(); + + long keysTotal = cm.getEstimatedRebalancingKeys(); + long keysReady = cm.getRebalancedKeys(); + + if (keysReady >= keysTotal) + keysReady = Math.max(keysTotal - 1, 0); - partitions += cm.getTotalPartitionsCount(); + total += keysTotal; + ready += keysReady; - long partTotal = cm.getEstimatedRebalancingKeys(); - long partReady = cm.getRebalancedKeys(); + if (!rebalanceInProgress && cm.getRebalancingPartitionsCount() > 0) + rebalanceInProgress = true; - if (partReady >= partTotal) - partReady = Math.max(partTotal - 1, 0); + first = false; + } - total += partTotal; - ready += partReady; + boolean addToRes = arg.getSystemCaches() || !(isSystemCache(cacheName) || isIgfsCache(cfg, cacheName)); - if (all || cacheGrps.contains(ca.configuration().getGroupName())) + if (addToRes && (all || cacheGrps.contains(ca.configuration().getGroupName()))) resCaches.add(new VisorCache(ignite, ca, arg.isCollectCacheMetrics())); } - catch(IllegalStateException | IllegalArgumentException e) { + catch (IllegalStateException | IllegalArgumentException e) { if (debug && ignite.log() != null) ignite.log().error("Ignored cache: " + cacheName, e); } @@ -237,11 +245,14 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } if (partitions == 0) - res.setRebalance(-1); + res.setRebalance(NOTHING_TO_REBALANCE); + else if (total == 0 && rebalanceInProgress) + res.setRebalance(MINIMAL_REBALANCE); else - res.setRebalance(total > 0 ? ready / total : 1); + res.setRebalance(total > 0 ? Math.max(ready / total, MINIMAL_REBALANCE) : REBALANCE_COMPLETE); } catch (Exception e) { + res.setRebalance(REBALANCE_NOT_AVAILABLE); res.setCachesEx(new VisorExceptionWrapper(e)); } } @@ -260,7 +271,8 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa FileSystemConfiguration igfsCfg = igfs.configuration(); - if (proxyCache(igfsCfg.getDataCacheConfiguration().getName()) || proxyCache(igfsCfg.getMetaCacheConfiguration().getName())) + if (isProxyCache(ignite, igfsCfg.getDataCacheConfiguration().getName()) || + isProxyCache(ignite, igfsCfg.getMetaCacheConfiguration().getName())) continue; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java index eb161f8..f8eb869 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java @@ -131,7 +131,8 @@ public class VisorNodeDataCollectorTaskResult extends VisorDataTransferObject { readyTopVers.isEmpty() && pendingExchanges.isEmpty() && persistenceMetrics.isEmpty() && - persistenceMetricsEx.isEmpty(); + persistenceMetricsEx.isEmpty() && + rebalance.isEmpty(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index fda9ba1..7ab1ffc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -58,6 +58,10 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.eviction.AbstractEvictionPolicyFactory; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -110,6 +114,19 @@ public class VisorTaskUtils { public static final int LOG_FILES_COUNT_LIMIT = 5000; /** */ + public static final int NOTHING_TO_REBALANCE = -1; + + /** */ + public static final int REBALANCE_NOT_AVAILABLE = -2; + + /** */ + public static final double MINIMAL_REBALANCE = 0.01; + + /** */ + public static final int REBALANCE_COMPLETE = 1; + + + /** */ private static final int DFLT_BUFFER_SIZE = 4096; /** Only task event types that Visor should collect. */ @@ -1248,4 +1265,30 @@ public class VisorTaskUtils { return Arrays.asList(addrs); } + + /** + * @param ignite Ignite. + * @param cacheName Cache name to check. + * @return {@code true} if cache on local node is not a data cache or near cache disabled. + */ + public static boolean isProxyCache(IgniteEx ignite, String cacheName) { + GridDiscoveryManager discovery = ignite.context().discovery(); + + ClusterNode locNode = ignite.localNode(); + + return !(discovery.cacheAffinityNode(locNode, cacheName) || discovery.cacheNearNode(locNode, cacheName)); + } + + /** + * Check whether cache restarting in progress. + * + * @param ignite Grid. + * @param cacheName Cache name to check. + * @return {@code true} when cache restarting in progress. + */ + public static boolean isRestartingCache(IgniteEx ignite, String cacheName) { + IgniteCacheProxy<Object, Object> proxy = ignite.context().cache().jcache(cacheName); + + return proxy instanceof IgniteCacheProxyImpl && ((IgniteCacheProxyImpl) proxy).isRestarting(); + } }
