IGNITE-4001: Timeouts for threads in Ignite pools. This closes #1130.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23461b8d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23461b8d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23461b8d

Branch: refs/heads/ignite-ssl-hotfix
Commit: 23461b8d33922772ef8e7217e9e87b3f3b0b37b1
Parents: a92f20b
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Thu Oct 6 10:14:59 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Thu Oct 6 10:14:59 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      | 18 +++--
 .../org/apache/ignite/internal/IgnitionEx.java  | 71 ++++++++++----------
 .../processors/igfs/IgfsThreadFactory.java      | 61 +++++++++++++++++
 3 files changed, 112 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 19b9a4d..73de470 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,29 +149,38 @@ public class IgniteConfiguration {
     public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, 
AVAILABLE_PROC_CNT) * 2;
 
     /** Default keep alive time for public thread pool. */
+    @Deprecated
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
     /** Default limit of threads used for rebalance. */
     public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;
 
     /** Default max queue capacity of public thread pool. */
+    @Deprecated
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = 
Integer.MAX_VALUE;
 
     /** Default size of system thread pool. */
     public static final int DFLT_SYSTEM_CORE_THREAD_CNT = 
DFLT_PUBLIC_THREAD_CNT;
 
     /** Default max size of system thread pool. */
+    @Deprecated
     public static final int DFLT_SYSTEM_MAX_THREAD_CNT = 
DFLT_PUBLIC_THREAD_CNT;
 
     /** Default keep alive time for system thread pool. */
+    @Deprecated
     public static final long DFLT_SYSTEM_KEEP_ALIVE_TIME = 0;
 
     /** Default keep alive time for utility thread pool. */
+    @Deprecated
     public static final long DFLT_UTILITY_KEEP_ALIVE_TIME = 10_000;
 
     /** Default max queue capacity of system thread pool. */
+    @Deprecated
     public static final int DFLT_SYSTEM_THREADPOOL_QUEUE_CAP = 
Integer.MAX_VALUE;
 
+    /** Default Ignite thread keep alive time. */
+    public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
+
     /** Default size of peer class loading thread pool. */
     public static final int DFLT_P2P_THREAD_CNT = 2;
 
@@ -240,13 +249,13 @@ public class IgniteConfiguration {
     private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
     /** Utility cache pool keep alive time. */
-    private long utilityCacheKeepAliveTime = DFLT_UTILITY_KEEP_ALIVE_TIME;
+    private long utilityCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME;
 
     /** Marshaller pool size. */
     private int marshCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
     /** Marshaller pool keep alive time. */
-    private long marshCacheKeepAliveTime = DFLT_UTILITY_KEEP_ALIVE_TIME;
+    private long marshCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME;
 
     /** P2P pool size. */
     private int p2pPoolSize = DFLT_P2P_THREAD_CNT;
@@ -492,6 +501,7 @@ public class IgniteConfiguration {
         cacheCfg = cfg.getCacheConfiguration();
         cacheKeyCfg = cfg.getCacheKeyConfiguration();
         cacheSanityCheckEnabled = cfg.isCacheSanityCheckEnabled();
+        callbackPoolSize = cfg.getAsyncCallbackPoolSize();
         connectorCfg = cfg.getConnectorConfiguration();
         classLdr = cfg.getClassLoader();
         clientMode = cfg.isClientMode();
@@ -792,7 +802,7 @@ public class IgniteConfiguration {
     /**
      * Keep alive time of thread pool that is in charge of processing utility 
cache messages.
      * <p>
-     * If not provided, executor service will have keep alive time {@link 
#DFLT_UTILITY_KEEP_ALIVE_TIME}.
+     * If not provided, executor service will have keep alive time {@link 
#DFLT_THREAD_KEEP_ALIVE_TIME}.
      *
      * @return Thread pool keep alive time (in milliseconds) to be used in 
grid for utility cache messages.
      */
@@ -814,7 +824,7 @@ public class IgniteConfiguration {
     /**
      * Keep alive time of thread pool that is in charge of processing 
marshaller messages.
      * <p>
-     * If not provided, executor service will have keep alive time {@link 
#DFLT_UTILITY_KEEP_ALIVE_TIME}.
+     * If not provided, executor service will have keep alive time {@link 
#DFLT_THREAD_KEEP_ALIVE_TIME}.
      *
      * @return Thread pool keep alive time (in milliseconds) to be used in 
grid for marshaller messages.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 2914c7c..001f599 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -36,7 +36,6 @@ import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +61,7 @@ import 
org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -119,10 +119,7 @@ import static 
org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_KEEP_ALIVE_TIME;
-import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_THREADPOOL_QUEUE_CAP;
-import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SYSTEM_KEEP_ALIVE_TIME;
-import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SYSTEM_THREADPOOL_QUEUE_CAP;
+import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
 import static org.apache.ignite.internal.IgniteComponentType.SPRING;
 import static 
org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_JVM;
 
@@ -1457,28 +1454,28 @@ public class IgnitionEx {
         private volatile IgniteKernal grid;
 
         /** Executor service. */
-        private ExecutorService execSvc;
+        private ThreadPoolExecutor execSvc;
 
         /** System executor service. */
-        private ExecutorService sysExecSvc;
+        private ThreadPoolExecutor sysExecSvc;
 
         /** Management executor service. */
-        private ExecutorService mgmtExecSvc;
+        private ThreadPoolExecutor mgmtExecSvc;
 
         /** P2P executor service. */
-        private ExecutorService p2pExecSvc;
+        private ThreadPoolExecutor p2pExecSvc;
 
         /** IGFS executor service. */
-        private ExecutorService igfsExecSvc;
+        private ThreadPoolExecutor igfsExecSvc;
 
         /** REST requests executor service. */
-        private ExecutorService restExecSvc;
+        private ThreadPoolExecutor restExecSvc;
 
         /** Utility cache executor service. */
-        private ExecutorService utilityCacheExecSvc;
+        private ThreadPoolExecutor utilityCacheExecSvc;
 
         /** Marshaller cache executor service. */
-        private ExecutorService marshCacheExecSvc;
+        private ThreadPoolExecutor marshCacheExecSvc;
 
         /** Continuous query executor service. */
         private IgniteStripedThreadPoolExecutor callbackExecSvc;
@@ -1642,12 +1639,10 @@ public class IgnitionEx {
                 cfg.getGridName(),
                 cfg.getPublicThreadPoolSize(),
                 cfg.getPublicThreadPoolSize(),
-                DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                new 
LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
 
-            if (!myCfg.isClientMode())
-                // Pre-start all threads as they are guaranteed to be needed.
-                ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
+            execSvc.allowCoreThreadTimeOut(true);
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1656,11 +1651,10 @@ public class IgnitionEx {
                 cfg.getGridName(),
                 cfg.getSystemThreadPoolSize(),
                 cfg.getSystemThreadPoolSize(),
-                DFLT_SYSTEM_KEEP_ALIVE_TIME,
-                new 
LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
 
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
+            sysExecSvc.allowCoreThreadTimeOut(true);
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1671,9 +1665,11 @@ public class IgnitionEx {
                 cfg.getGridName(),
                 cfg.getManagementThreadPoolSize(),
                 cfg.getManagementThreadPoolSize(),
-                0,
+                DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>());
 
+            mgmtExecSvc.allowCoreThreadTimeOut(true);
+
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
             // Note, that we do not pre-start threads here as class loading 
pool may
@@ -1683,20 +1679,21 @@ public class IgnitionEx {
                 cfg.getGridName(),
                 cfg.getPeerClassLoadingThreadPoolSize(),
                 cfg.getPeerClassLoadingThreadPoolSize(),
-                0,
+                DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>());
 
+            p2pExecSvc.allowCoreThreadTimeOut(true);
+
             // Note that we do not pre-start threads here as igfs pool may not 
be needed.
             igfsExecSvc = new IgniteThreadPoolExecutor(
-                "igfs",
-                cfg.getGridName(),
                 cfg.getIgfsThreadPoolSize(),
                 cfg.getIgfsThreadPoolSize(),
-                0,
-                new LinkedBlockingQueue<Runnable>());
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(),
+                new IgfsThreadFactory(cfg.getGridName(), "igfs"),
+                null /* Abort policy will be used. */);
 
-            // Pre-start all threads to avoid HadoopClassLoader leaks.
-            ((ThreadPoolExecutor)igfsExecSvc).prestartAllCoreThreads();
+            igfsExecSvc.allowCoreThreadTimeOut(true);
 
             // Note that we do not pre-start threads here as this pool may not 
be needed.
             callbackExecSvc = new IgniteStripedThreadPoolExecutor(
@@ -1710,9 +1707,11 @@ public class IgnitionEx {
                     myCfg.getGridName(),
                     myCfg.getConnectorConfiguration().getThreadPoolSize(),
                     myCfg.getConnectorConfiguration().getThreadPoolSize(),
-                    ConnectorConfiguration.DFLT_KEEP_ALIVE_TIME,
-                    new 
LinkedBlockingQueue<Runnable>(ConnectorConfiguration.DFLT_THREADPOOL_QUEUE_CAP)
+                    DFLT_THREAD_KEEP_ALIVE_TIME,
+                    new LinkedBlockingQueue<Runnable>()
                 );
+
+                restExecSvc.allowCoreThreadTimeOut(true);
             }
 
             utilityCacheExecSvc = new IgniteThreadPoolExecutor(
@@ -1721,7 +1720,9 @@ public class IgnitionEx {
                 myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheKeepAliveTime(),
-                new 
LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+                new LinkedBlockingQueue<Runnable>());
+
+            utilityCacheExecSvc.allowCoreThreadTimeOut(true);
 
             marshCacheExecSvc = new IgniteThreadPoolExecutor(
                 "marshaller-cache",
@@ -1729,7 +1730,9 @@ public class IgnitionEx {
                 myCfg.getMarshallerCacheThreadPoolSize(),
                 myCfg.getMarshallerCacheThreadPoolSize(),
                 myCfg.getMarshallerCacheKeepAliveTime(),
-                new 
LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+                new LinkedBlockingQueue<Runnable>());
+
+            marshCacheExecSvc.allowCoreThreadTimeOut(true);
 
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());

http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java
new file mode 100644
index 0000000..32cab0f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.thread.IgniteThreadFactory;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Special thread factory used only for IGFS pool which prevents {@link 
HadoopClassLoader} leak into
+ * {@code Thread.contextClassLoader} field. To achieve this we switch context 
class loader back and forth when
+ * creating threads.
+ */
+public class IgfsThreadFactory extends IgniteThreadFactory {
+    /**
+     * Constructor.
+     *
+     * @param gridName Grid name.
+     * @param threadName Thread name.
+     */
+    public IgfsThreadFactory(String gridName, String threadName) {
+        super(gridName, threadName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Thread newThread(@NotNull Runnable r) {
+        Thread curThread = Thread.currentThread();
+
+        ClassLoader oldLdr = curThread.getContextClassLoader();
+
+        
curThread.setContextClassLoader(IgfsThreadFactory.class.getClassLoader());
+
+        try {
+            return super.newThread(r);
+        }
+        finally {
+            curThread.setContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsThreadFactory.class, this);
+    }
+}

Reply via email to