Repository: ignite
Updated Branches:
  refs/heads/master bcbe8cc44 -> 86d31537f


ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d

Branch: refs/heads/master
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300

----------------------------------------------------------------------
 .../spi/swapspace/file/FileSwapSpaceSpi.java    | 38 +++++++--
 .../CacheSwapUnswapGetTestSmallQueueSize.java   | 35 ++++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      | 89 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |  2 +
 4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter 
implements SwapSpaceSpi,
         if (space == null && create) {
             validateName(name);
 
-            Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+            Space old = spaces.putIfAbsent(masked, space = new Space(masked, 
log));
 
             if (old != null)
                 space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter 
implements SwapSpaceSpi,
         /** */
         private final int maxSize;
 
+        /** */
+        private final IgniteLogger log;
+
+        /** */
+        private boolean queueSizeWarn;
+
         /**
          * @param minTakeSize Min size.
          * @param maxSize Max size.
+         * @param log logger
          */
-        private SwapValuesQueue(int minTakeSize, int maxSize) {
+        private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger 
log) {
             this.minTakeSize = minTakeSize;
             this.maxSize = maxSize;
+            this.log = log;
         }
 
         /**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter 
implements SwapSpaceSpi,
             lock.lock();
 
             try {
-                while (size + val.len > maxSize)
-                    mayAdd.await();
+                boolean largeVal = val.len > maxSize;
+
+                if (largeVal) {
+                    if (!queueSizeWarn) {
+                        U.warn(log, "Trying to save in swap entry which have 
size more than write queue size. " +
+                            "You may wish to increase 'maxWriteQueueSize' in 
FileSwapSpaceSpi configuration " +
+                            "[queueMaxSize=" + maxSize + ", valSize=" + 
val.len + ']');
+
+                        queueSizeWarn = true;
+                    }
+
+                    while (size >= minTakeSize)
+                        mayAdd.await();
+                }
+                else {
+                    while (size + val.len > maxSize)
+                        mayAdd.await();
+                }
 
                 size += val.len;
 
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter 
implements SwapSpaceSpi,
         private SwapFile right;
 
         /** */
-        private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, 
maxWriteQueSize);
+        private final SwapValuesQueue que;
 
         /** Partitions. */
         private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, 
SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter 
implements SwapSpaceSpi,
 
         /**
          * @param name Space name.
+         * @param log Logger.
          */
-        private Space(String name) {
+        private Space(String name, IgniteLogger log) {
             assert name != null;
 
             this.name = name;
+            this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends 
CacheSwapUnswapGetTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
 import org.apache.ignite.spi.swapspace.SwapKey;
 import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
 
 /**
  * Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends 
GridSwapSpaceSpiAbstractSelfTe
 
         assertEquals(hash0, hash1);
     }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSize() throws 
IgniteCheckedException {
+        final String spaceName = "mySpace";
+        final SwapKey key = new SwapKey("key");
+
+        final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+        Arrays.fill(val, (byte)1);
+
+        IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new 
Callable<byte[]>() {
+            @Override public byte[] call() throws Exception {
+                return saveAndGet(spaceName, key, val);
+            }
+        });
+
+        byte[] bytes = fut.get(10_000);
+
+        Assert.assertArrayEquals(val, bytes);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSizeMultiThreaded() throws 
Exception {
+        final String spaceName = "mySpace";
+
+        final int threads = 5;
+
+        long DURATION = 30_000;
+
+        final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            IgniteInternalFuture<?> fut = multithreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!done.get()) {
+                        SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+                        spi.store(spaceName, key, new byte[rnd.nextInt(0, 
maxSize)], context());
+                    }
+
+                    return null;
+                }
+            }, threads, " async-put");
+
+            Thread.sleep(DURATION);
+
+            done.set(true);
+
+            fut.get();
+        }
+        finally {
+           done.set(true);
+        }
+    }
+
+    /**
+     * @param spaceName Space name.
+     * @param key Key.
+     * @param val Value.
+     * @throws Exception If failed.
+     * @return Read bytes.
+     */
+    private byte[] saveAndGet(final String spaceName, final SwapKey key, 
byte[] val) throws Exception {
+        spi.store(spaceName, key, val, context());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spi.read(spaceName, key, context()) != null;
+            }
+        }, 10_000);
+
+        byte[] res = spi.read(spaceName, key, context());
+
+        assertNotNull(res);
+
+        return res;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
 import 
org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
 import 
org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
 import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import 
org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
 import 
org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         
suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
 
         suite.addTestSuite(CacheSwapUnswapGetTest.class);
+        suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
 
         suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
         suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);

Reply via email to