Repository: asterixdb
Updated Branches:
  refs/heads/master 6daa1b152 -> 35a1b14f7


[ASTERIXDB-2229][OTR] Restore Thread Names in Thread Pool

- user model changes: no
- storage format changes: no
- interface changes: no

- Restore thread names to their original names
  before returning them to the pool of the NC
  app.
- Remove explicit thread name resets.
- Delete unused ThreadExecutor class.

Change-Id: I3bda1b65e7aefd35d2b8cfa814f73369c3bf5a18
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2447
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/35a1b14f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/35a1b14f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/35a1b14f

Branch: refs/heads/master
Commit: 35a1b14f731b97a4fde99b242be94fab2b932a38
Parents: 6daa1b1
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Sun Mar 4 09:23:17 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Sun Mar 4 22:18:47 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/app/active/RecoveryTask.java | 13 ++---
 .../asterix/app/nc/NCAppRuntimeContext.java     |  5 +-
 .../asterix/common/api/ThreadExecutor.java      | 43 ---------------
 .../FeedIntakeOperatorNodePushable.java         |  2 -
 .../management/ReplicationChannel.java          |  2 -
 .../control/cc/ClusterControllerService.java    |  4 +-
 .../control/nc/NodeControllerService.java       |  4 +-
 .../org/apache/hyracks/control/nc/Task.java     |  4 --
 .../MaterializingPipelinedPartition.java        |  2 -
 .../MaintainedThreadNameExecutorService.java    | 56 ++++++++++++++++++++
 10 files changed, 66 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index dacd0ee..2de8319 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -59,19 +59,12 @@ public class RecoveryTask {
 
     public Callable<Void> recover() {
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            return () -> {
-                return null;
-            };
+            return () -> null;
         }
         IRetryPolicy policy = retryPolicyFactory.create(listener);
         return () -> {
-            String nameBefore = Thread.currentThread().getName();
-            try {
-                Thread.currentThread().setName("RecoveryTask (" + 
listener.getEntityId() + ")");
-                doRecover(policy);
-            } finally {
-                Thread.currentThread().setName(nameBefore);
-            }
+            Thread.currentThread().setName("RecoveryTask (" + 
listener.getEntityId() + ")");
+            doRecover(policy);
             return null;
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c724952..288e5f2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActiveManager;
@@ -102,6 +101,7 @@ import 
org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import org.apache.hyracks.storage.common.file.FileMapManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -171,7 +171,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     @Override
     public void initialize(boolean initialRun) throws IOException, 
ACIDException {
         ioManager = getServiceContext().getIoManager();
-        threadExecutor = 
Executors.newCachedThreadPool(getServiceContext().getThreadFactory());
+        threadExecutor =
+                
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
         IPageReplacementStrategy prs = new 
ClockPageReplacementStrategy(allocator,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
deleted file mode 100644
index 03cead0..0000000
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.api;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-
-public class ThreadExecutor implements Executor {
-    private final ExecutorService executorService;
-
-    public ThreadExecutor(ThreadFactory threadFactory) {
-        executorService = Executors.newCachedThreadPool(threadFactory);
-    }
-
-    @Override
-    public void execute(Runnable command) {
-        executorService.execute(command);
-    }
-
-    public <T> Future<T> submit(Callable<T> command) {
-        return executorService.submit(command);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6c95cc2..0503677 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -60,7 +60,6 @@ public class FeedIntakeOperatorNodePushable extends 
ActiveSourceOperatorNodePush
 
     @Override
     protected void start() throws HyracksDataException, InterruptedException {
-        String before = Thread.currentThread().getName();
         Thread.currentThread().setName("Intake Thread");
         try {
             writer.open();
@@ -85,7 +84,6 @@ public class FeedIntakeOperatorNodePushable extends 
ActiveSourceOperatorNodePush
             throw e;
         } finally {
             writer.close();
-            Thread.currentThread().setName(before);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 417164c..bc93294 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -106,7 +106,6 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
 
         @Override
         public void run() {
-            final String oldName = Thread.currentThread().getName();
             Thread.currentThread().setName("Replication Worker");
             try {
                 ReplicationRequestType requestType = 
ReplicationProtocol.getRequestType(socketChannel, inBuffer);
@@ -124,7 +123,6 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                         LOGGER.warn("Failed to close replication socket.", e);
                     }
                 }
-                Thread.currentThread().setName(oldName);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index f8fe77f..caef91a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,7 +34,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
@@ -82,6 +81,7 @@ import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -232,7 +232,7 @@ public class ClusterControllerService implements 
IControllerService {
         serviceCtx = new CCServiceContext(this, serverCtx, ccContext, 
ccConfig.getAppConfig());
         serviceCtx.addJobLifecycleListener(datasetDirectoryService);
         application.init(serviceCtx);
-        executor = 
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+        executor = 
MaintainedThreadNameExecutorService.newCachedThreadPool(serviceCtx.getThreadFactory());
         application.start(ccConfig.getAppArgsArray());
         IJobCapacityController jobCapacityController = 
application.getJobCapacityController();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0d0c535..b67bfac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,7 +37,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -94,6 +93,7 @@ import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.Tracer;
@@ -480,7 +480,7 @@ public class NodeControllerService implements 
IControllerService {
         serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, 
memoryManager, lccm,
                 ncConfig.getNodeScopedAppConfig());
         application.init(serviceCtx);
-        executor = 
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+        executor = 
MaintainedThreadNameExecutorService.newCachedThreadPool(serviceCtx.getThreadFactory());
         application.start(ncConfig.getAppArgsArray());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9b32cc7..dcfc291 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -275,7 +275,6 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
     @Override
     public void run() {
         Thread ct = Thread.currentThread();
-        String threadName = ct.getName();
         // Calls synchronized addPendingThread(..) to make sure that in the 
abort() method,
         // the thread is not escaped from interruption.
         if (!addPendingThread(ct)) {
@@ -304,7 +303,6 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                             if (!addPendingThread(thread)) {
                                 return;
                             }
-                            String oldName = thread.getName();
                             thread.setName(displayName + ":" + taskAttemptId + 
":" + cIdx);
                             thread.setPriority(Thread.MIN_PRIORITY);
                             try {
@@ -314,7 +312,6 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                                     exceptions.add(e);
                                 }
                             } finally {
-                                thread.setName(oldName);
                                 sem.release();
                                 removePendingThread(thread);
                             }
@@ -351,7 +348,6 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
         } catch (Exception e) {
             exceptions.add(e);
         } finally {
-            ct.setName(threadName);
             close();
             removePendingThread(ct);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index a782bca..29c2ff2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -88,7 +88,6 @@ public class MaterializingPipelinedPartition implements 
IFrameWriter, IPartition
             public void run() {
                 Thread thread = Thread.currentThread();
                 setDataConsumerThread(thread); // Sets the data consumer 
thread to the current thread.
-                String oldName = thread.getName();
                 try {
                     
thread.setName(MaterializingPipelinedPartition.class.getName() + pid);
                     FileReference fRefCopy;
@@ -167,7 +166,6 @@ public class MaterializingPipelinedPartition implements 
IFrameWriter, IPartition
                 } catch (Exception e) {
                     LOGGER.log(Level.ERROR, e.getMessage(), e);
                 } finally {
-                    thread.setName(oldName);
                     setDataConsumerThread(null); // Sets back the data 
consumer thread to null.
                 }
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/35a1b14f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
new file mode 100644
index 0000000..a9ebb50
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MaintainedThreadNameExecutorService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class MaintainedThreadNameExecutorService extends ThreadPoolExecutor {
+
+    private final Map<Thread, String> threadNames = new ConcurrentHashMap<>();
+
+    private MaintainedThreadNameExecutorService(ThreadFactory threadFactory) {
+        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new 
SynchronousQueue<>(), threadFactory);
+    }
+
+    public static ExecutorService newCachedThreadPool(ThreadFactory 
threadFactory) {
+        return new MaintainedThreadNameExecutorService(threadFactory);
+    }
+
+    @Override
+    protected void beforeExecute(Thread t, Runnable r) {
+        threadNames.put(t, t.getName());
+        super.beforeExecute(t, r);
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        final Thread thread = Thread.currentThread();
+        final String originalThreadName = threadNames.remove(thread);
+        if (originalThreadName != null) {
+            thread.setName(originalThreadName);
+        }
+    }
+}

Reply via email to