Reamer commented on a change in pull request #3925:
URL: https://github.com/apache/zeppelin/pull/3925#discussion_r501730956



##########
File path: 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);

Review comment:
       Remove or debug

##########
File path: bin/interpreter.sh
##########
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-
+set -x

Review comment:
       Remove

##########
File path: 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", 
name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, 
TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
-  
+
   public Scheduler createOrGetScheduler(Scheduler scheduler) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(scheduler.getName())) {
-        schedulers.put(scheduler.getName(), scheduler);
-        executor.execute(scheduler);
-      }
-      return schedulers.get(scheduler.getName());
+    LOGGER.info("register new scheduler {}", scheduler.getName());

Review comment:
       remove or debug

##########
File path: 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", 
name, maxConcurrency);
+    if (!schedulers.containsKey(name)) {
+      ParallelScheduler s = new ParallelScheduler(name, maxConcurrency, 5, 
TimeUnit.SECONDS);
+      LOGGER.info("new ParallelScheduler {}", s);

Review comment:
       Remove

##########
File path: 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);

Review comment:
       Remove or Debug

##########
File path: 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
##########
@@ -35,81 +37,88 @@
   private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
 
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new HashMap<>();
+  protected Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
     ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
+    // stop all child thread of schedulers
+    for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+      LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+      scheduler.getValue().stop();
+    }
+    this.executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("executor did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
     this.executor = null;
-    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for FIFOScheduler {}", name);
+    if (!schedulers.containsKey(name)) {
+      FIFOScheduler s = new FIFOScheduler(name, 5, TimeUnit.SECONDS);
+      LOGGER.info("new FIFOScheduler {}", name);
+      schedulers.put(name, s);
+      executor.execute(s);
     }
+    return schedulers.get(name);
   }
 
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
-    synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
+    LOGGER.info("locking for ParallelScheduler {} with {} maxConcurrency", 
name, maxConcurrency);

Review comment:
       Remove or Debug




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to