IGNITE-4756 Print info about partition distribution to log

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3eb1f5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3eb1f5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3eb1f5d

Branch: refs/heads/ignite-7708
Commit: a3eb1f5d753a38c4019440e1bf39d00bc6136455
Parents: 0e73fa2
Author: Vyacheslav Daradur <daradu...@gmail.com>
Authored: Wed Apr 11 14:41:29 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Wed Apr 11 14:41:29 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   7 +
 .../affinity/GridAffinityAssignmentCache.java   |  50 +++-
 .../AffinityDistributionLoggingTest.java        | 268 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite5.java       |   9 +-
 4 files changed, 327 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 9da123e..04eb425 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -857,6 +857,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = 
"IGNITE_BPLUS_TREE_LOCK_RETRIES";
 
     /**
+     * The threshold of uneven distribution above which partition distribution 
will be logged.
+     *
+     * The default is '50', that means: warn about nodes with 50+% difference.
+     */
+    public static final String IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD = 
"IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 18edd02..b1899e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -34,13 +34,14 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.NodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.NodeOrderComparator;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -53,7 +54,10 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.getFloat;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
 /**
@@ -63,6 +67,9 @@ public class GridAffinityAssignmentCache {
     /** Cleanup history size. */
     private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 
500);
 
+    /** Partition distribution. */
+    private final float partDistribution = 
getFloat(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 50f);
+
     /** Group name if specified or cache name. */
     private final String cacheOrGrpName;
 
@@ -367,6 +374,9 @@ public class GridAffinityAssignmentCache {
 
         idealAssignment = assignment;
 
+        if (ctx.cache().cacheMode(cacheOrGrpName) == PARTITIONED)
+            printDistributionIfThresholdExceeded(assignment, sorted.size());
+
         if (hasBaseline) {
             baselineTopology = discoCache.state().baselineTopology();
             assert baselineAssignment != null;
@@ -418,6 +428,44 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * Calculates and logs partitions distribution if threshold of uneven 
distribution {@link #partDistribution} is exceeded.
+     *
+     * @param assignments Assignments to calculate partitions distribution.
+     * @param nodes Affinity nodes number.
+     * @see IgniteSystemProperties#IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD
+     */
+    private void printDistributionIfThresholdExceeded(List<List<ClusterNode>> 
assignments, int nodes) {
+        int locPrimaryCnt = 0;
+        int locBackupCnt = 0;
+
+        for (List<ClusterNode> assignment : assignments) {
+            for (int i = 0; i < assignment.size(); i++) {
+                ClusterNode node = assignment.get(i);
+
+                if (node.isLocal()) {
+                    if (i == 0)
+                        locPrimaryCnt++;
+                    else
+                        locBackupCnt++;
+                }
+            }
+        }
+
+        float expCnt = (float)partsCnt / nodes;
+
+        float deltaPrimary = Math.abs(1 - (float)locPrimaryCnt / expCnt) * 100;
+        float deltaBackup = Math.abs(1 - (float)locBackupCnt / (expCnt * 
backups)) * 100;
+
+        if (deltaPrimary > partDistribution || deltaBackup > partDistribution) 
{
+            log.info(String.format("Local node affinity assignment 
distribution is not ideal " +
+                    "[cache=%s, expectedPrimary=%.2f, actualPrimary=%d, " +
+                    "expectedBackups=%.2f, actualBackups=%d, 
warningThreshold=%.2f%%]",
+                cacheOrGrpName, expCnt, locPrimaryCnt,
+                expCnt * backups, locBackupCnt, partDistribution));
+        }
+    }
+
+    /**
      * Copies previous affinity assignment when discovery event does not cause 
affinity assignment changes
      * (e.g. client node joins on leaves).
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
new file mode 100644
index 0000000..813c830
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ * Tests of partitions distribution logging.
+ *
+ * Tests based on using of affinity function which provides an even 
distribution of partitions between nodes.
+ *
+ * @see EvenDistributionAffinityFunction
+ */
+public class AffinityDistributionLoggingTest extends GridCommonAbstractTest {
+    /** Pattern to test. */
+    private static final String LOG_MESSAGE_PREFIX = "Local node affinity 
assignment distribution is not ideal ";
+
+    /** Partitions number. */
+    private int parts = 0;
+
+    /** Nodes number. */
+    private int nodes = 0;
+
+    /** Backups number. */
+    private int backups = 0;
+
+    /** For storing original value of system property. */
+    private String tempProp;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        tempProp = System.getProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        if (tempProp != null)
+            System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 
tempProp);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        System.clearProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD);
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setBackups(backups);
+        cacheCfg.setAffinity(new EvenDistributionAffinityFunction(parts));
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception In case of an error.
+     */
+    public void test2PartitionsIdealDistributionIsNotLogged() throws Exception 
{
+        System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0");
+
+        nodes = 2;
+        parts = 2;
+        backups = 1;
+
+        String testsLog = runAndGetExchangeLog();
+
+        assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+    }
+
+    /**
+     * @throws Exception In case of an error.
+     */
+    public void test120PartitionsIdeadDistributionIsNotLogged() throws 
Exception {
+        System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0.0");
+
+        nodes = 3;
+        parts = 120;
+        backups = 2;
+
+        String testsLog = runAndGetExchangeLog();
+
+        assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+    }
+
+    /**
+     * @throws Exception In case of an error.
+     */
+    public void test5PartitionsNotIdealDistributionIsLogged() throws Exception 
{
+        System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0");
+
+        nodes = 4;
+        parts = 5;
+        backups = 3;
+
+        String testsLog = runAndGetExchangeLog();
+
+        assertTrue(testsLog.contains(LOG_MESSAGE_PREFIX));
+    }
+
+    /**
+     * @throws Exception In case of an error.
+     */
+    public void test7PartitionsNotIdealDistributionSuppressedLogging() throws 
Exception {
+        System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0");
+
+        nodes = 3;
+        parts = 7;
+        backups = 0;
+
+        String testsLog = runAndGetExchangeLog();
+
+        assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+    }
+
+    /**
+     * @throws Exception In case of an error.
+     */
+    public void test5PartitionsNotIdealDistributionSuppressedLogging() throws 
Exception {
+        System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "65");
+
+        nodes = 4;
+        parts = 5;
+        backups = 3;
+
+        String testsLog = runAndGetExchangeLog();
+
+        assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+    }
+
+    /**
+     * Starts a specified number of Ignite nodes and log partition node 
exchange during a last node's startup.
+     *
+     * @return Log of latest partition map exchange.
+     * @throws Exception In case of an error.
+     */
+    private String runAndGetExchangeLog() throws Exception {
+        assert nodes > 1;
+
+        IgniteEx ignite = (IgniteEx)startGrids(nodes - 1);
+
+        awaitPartitionMapExchange();
+
+        GridCacheProcessor proc = ignite.context().cache();
+
+        GridCacheContext cctx = 
proc.context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+        final GridStringLogger log = new GridStringLogger(false, this.log);
+
+        GridAffinityAssignmentCache aff = 
GridTestUtils.getFieldValue(cctx.affinity(), "aff");
+
+        GridTestUtils.setFieldValue(aff, "log", log);
+
+        startGrid(nodes);
+
+        awaitPartitionMapExchange();
+
+        return log.toString();
+    }
+
+    /**
+     * Affinity function for a partitioned cache which provides even 
distribution partitions between nodes in cluster.
+     */
+    private static class EvenDistributionAffinityFunction implements 
AffinityFunction {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Partitions number. */
+        private int parts;
+
+        /**
+         * @param parts Number of partitions for one cache.
+         */
+        private EvenDistributionAffinityFunction(int parts) {
+            this.parts = parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return key.hashCode() % parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> 
assignPartitions(AffinityFunctionContext affCtx) {
+            List<ClusterNode> nodes = new 
ArrayList<>(affCtx.currentTopologySnapshot());
+
+            nodes.sort(Comparator.comparing(o -> 
o.<String>attribute(ATTR_IGNITE_INSTANCE_NAME)));
+
+            List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+            for (int i = 0; i < parts; i++) {
+                Set<ClusterNode> n0 = new LinkedHashSet<>();
+
+                n0.add(nodes.get(i % nodes.size()));
+
+                for (int j = 1; j <= affCtx.backups(); j++)
+                    n0.add(nodes.get((i + j) % nodes.size()));
+
+                res.add(new ArrayList<>(n0));
+            }
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 7c41e49..945a76c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.GridCacheAffinityBackupsSelfTest;
 import org.apache.ignite.IgniteCacheAffinitySelfTest;
 import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest;
+import org.apache.ignite.cache.affinity.AffinityDistributionLoggingTest;
 import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest;
 import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest;
 import 
org.apache.ignite.internal.GridCachePartitionExchangeManagerHistSizeTest;
@@ -35,13 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadTh
 import 
org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictionsVariationsSuite;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
-import 
org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest;
-import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest;
-import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest;
-import 
org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
-import 
org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
-import 
org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
@@ -95,6 +90,8 @@ public class IgniteCacheTestSuite5 extends TestSuite {
         suite.addTestSuite(LocalAffinityFunctionTest.class);
         suite.addTestSuite(AffinityHistoryCleanupTest.class);
 
+        suite.addTestSuite(AffinityDistributionLoggingTest.class);
+
         suite.addTestSuite(IgniteCacheAtomicProtocolTest.class);
 
         
suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);

Reply via email to