Repository: asterixdb Updated Branches: refs/heads/master 82d659f6b -> 8fa6babc0
[NO ISSUE][HYR] Notify CC of NC shutdown only after shutdown is complete - close NC IPC manager after sending shutdown notifcation to CC Change-Id: Idde1f69a0e0a9a948898d9271441ca95485b77f4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2159 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8fa6babc Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8fa6babc Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8fa6babc Branch: refs/heads/master Commit: 8fa6babc0ac90567ef3d7aa20394c0b2ea219089 Parents: 82d659f Author: Michael Blow <[email protected]> Authored: Fri Nov 17 18:03:18 2017 -0500 Committer: Michael Blow <[email protected]> Committed: Fri Nov 17 15:04:20 2017 -0800 ---------------------------------------------------------------------- .../asterix/common/utils/InterruptUtil.java | 118 -------------- .../apache/asterix/common/utils/InvokeUtil.java | 155 +++++++++++++++++++ .../management/service/logging/LogManager.java | 8 +- .../java/org/apache/asterix/aoya/Utils.java | 62 +++++--- .../control/cc/work/NotifyShutdownWork.java | 10 +- .../hyracks/control/nc/NodeControllerIPCI.java | 2 +- .../control/nc/NodeControllerService.java | 7 + .../hyracks/control/nc/task/ShutdownTask.java | 19 +-- 8 files changed, 213 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java deleted file mode 100644 index 4c65c66..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.utils; - -public class InterruptUtil { - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doUninterruptibly(Interruptible interruptible) { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doUninterruptiblyGet(Interruptible interruptible) { - boolean interrupted = false; - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } - } - return interrupted; - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an - * exception after being previously interrupted, the current thread will be re-interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - boolean success = false; - while (true) { - try { - interruptible.run(); - success = true; - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } finally { - if (!success && interrupted) { - Thread.currentThread().interrupt(); - } - } - } - return interrupted; - } - - @FunctionalInterface - public interface Interruptible { - void run() throws InterruptedException; - } - - @FunctionalInterface - public interface ThrowingInterruptible { - void run() throws Exception; // NOSONAR - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java new file mode 100644 index 0000000..c7ac0f4 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class InvokeUtil { + + private static final Logger LOGGER = Logger.getLogger(InvokeUtil.class.getName()); + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doUninterruptibly(Interruptible interruptible) { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doUninterruptiblyGet(Interruptible interruptible) { + boolean interrupted = false; + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } + } + return interrupted; + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an + * exception after being previously interrupted, the current thread will be re-interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + boolean success = false; + while (true) { + try { + interruptible.run(); + success = true; + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } finally { + if (!success && interrupted) { + Thread.currentThread().interrupt(); + } + } + } + return interrupted; + } + + public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit, + Callable<Boolean> function) throws IOException { + long endTime = System.nanoTime() + durationUnit.toNanos(duration); + boolean first = true; + while (endTime - System.nanoTime() > 0) { + if (first) { + first = false; + } else { + try { + delayUnit.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + try { + if (function.call()) { + return true; + } + } catch (Exception e) { + // ignore, retry after delay + LOGGER.log(Level.FINE, "Ignoring exception on retryLoop attempt, will retry after delay", e); + } + } + return false; + } + + @FunctionalInterface + public interface Interruptible { + void run() throws InterruptedException; + } + + @FunctionalInterface + public interface ThrowingInterruptible { + void run() throws Exception; // NOSONAR + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 4d671f3..dd0a5c7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -51,7 +51,7 @@ import org.apache.asterix.common.transactions.LogManagerProperties; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; -import org.apache.asterix.common.utils.InterruptUtil; +import org.apache.asterix.common.utils.InvokeUtil; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; public class LogManager implements ILogManager, ILifeCycleComponent { @@ -655,7 +655,7 @@ class LogFlusher implements Callable<Boolean> { public void terminate() { // make sure the LogFlusher thread started before terminating it. - InterruptUtil.doUninterruptibly(started::acquire); + InvokeUtil.doUninterruptibly(started::acquire); stopping = true; @@ -665,7 +665,7 @@ class LogFlusher implements Callable<Boolean> { currentFlushPage.stop(); } // finally we put a POISON_PILL onto the flushQ to indicate to the flusher it is time to exit - InterruptUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL)); + InvokeUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL)); } @Override @@ -675,7 +675,7 @@ class LogFlusher implements Callable<Boolean> { try { while (true) { flushPage = null; - interrupted = InterruptUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted; + interrupted = InvokeUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted; if (flushPage == POISON_PILL) { return true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java index de9ec90..675d4ab 100644 --- a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java +++ b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/Utils.java @@ -28,6 +28,8 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.List; import java.util.Scanner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.xml.bind.JAXBContext; @@ -35,7 +37,17 @@ import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; -import org.apache.commons.httpclient.*; +import org.apache.asterix.common.configuration.AsterixConfiguration; +import org.apache.asterix.common.utils.InvokeUtil; +import org.apache.asterix.event.schema.yarnCluster.Cluster; +import org.apache.asterix.event.schema.yarnCluster.Node; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpException; +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.HttpMethodRetryHandler; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.NameValuePair; +import org.apache.commons.httpclient.NoHttpResponseException; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.params.HttpMethodParams; @@ -49,10 +61,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.asterix.common.configuration.AsterixConfiguration; -import org.apache.asterix.event.schema.yarnCluster.Cluster; -import org.apache.asterix.event.schema.yarnCluster.Node; - public class Utils { private Utils() { @@ -105,11 +113,15 @@ public class Utils { //do nothing... this is expected } //now let's test that the instance is really down, or throw an exception - try { - executeHTTPCall(method); - } catch (ConnectException e) { - return; - } + InvokeUtil.retryLoop(1, TimeUnit.MINUTES, 500, TimeUnit.MILLISECONDS, () -> { + try { + executeHTTPCall(method); + } catch (ConnectException e) { + //do nothing... this is expected + return true; + } + return false; + }); throw new IOException("Instance did not shut down cleanly."); } @@ -142,7 +154,7 @@ public class Utils { if (result == null) { return false; } - if(method.getStatusCode() != HttpStatus.SC_OK){ + if (method.getStatusCode() != HttpStatus.SC_OK) { return false; } return true; @@ -237,7 +249,7 @@ public class Utils { * @throws IOException */ public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException { - List<String> backups = getBackups(conf,confDirRel,instance); + List<String> backups = getBackups(conf, confDirRel, instance); if (backups.size() != 0) { System.out.println("Backups for instance " + instance + ": "); for (String name : backups) { @@ -247,20 +259,22 @@ public class Utils { System.out.println("No backups found for instance " + instance + "."); } } - /** - * Return the available snapshot names - * @param conf - * @param confDirRel - * @param instance - * @return - * @throws IOException - */ - public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{ + + /** + * Return the available snapshot names + * + * @param conf + * @param confDirRel + * @param instance + * @return + * @throws IOException + */ + public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException { FileSystem fs = FileSystem.get(conf); Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups"); FileStatus[] backups = fs.listStatus(backupFolder); List<String> backupNames = new ArrayList<String>(); - for(FileStatus f: backups){ + for (FileStatus f : backups) { backupNames.add(f.getPath().getName()); } return backupNames; @@ -441,8 +455,8 @@ public class Utils { return waitForLiveness(appId, false, true, message, yarnClient, "", null, port); } - public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException, - IOException, JAXBException { + public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) + throws YarnException, IOException, JAXBException { return waitForLiveness(appId, false, false, "", yarnClient, "", null, port); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java index 5119022..83cbb91 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyShutdownWork.java @@ -27,9 +27,9 @@ import org.apache.hyracks.control.common.work.SynchronizableWork; public class NotifyShutdownWork extends SynchronizableWork { + private static final Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName()); private final ClusterControllerService ccs; private final String nodeId; - private static Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName()); public NotifyShutdownWork(ClusterControllerService ccs, String nodeId) { this.ccs = ccs; @@ -41,8 +41,12 @@ public class NotifyShutdownWork extends SynchronizableWork { public void doRun() { // Triggered remotely by a NC to notify that the NC is shutting down. ShutdownRun sRun = ccs.getShutdownRun(); - LOGGER.info("Received shutdown acknowledgement from NC ID:" + nodeId); - sRun.notifyShutdown(nodeId); + if (sRun != null) { + LOGGER.info("Received shutdown acknowledgement from node " + nodeId); + sRun.notifyShutdown(nodeId); + } else { + LOGGER.info("Received unsolicted shutdown notification from node " + nodeId); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index c54f153..b220039 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -122,7 +122,7 @@ final class NodeControllerIPCI implements IIPCI { case SHUTDOWN_REQUEST: final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn; - ncs.getExecutor().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService())); + ncs.getExecutor().submit(new ShutdownTask(sdrf.isTerminateNCService())); return; case THREAD_DUMP_REQUEST: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 9dd9536..a3a9ac5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -409,6 +409,13 @@ public class NodeControllerService implements IControllerService { heartbeatThread.interrupt(); heartbeatThread.join(1000); // give it 1s to stop gracefully } + try { + ccs.notifyShutdown(id); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception notifying CC of shutdown", e); + } + ipc.stop(); + LOGGER.log(Level.INFO, "Stopped NodeControllerService"); } else { LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8fa6babc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java index e9cf3cb..4dd57f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ShutdownTask.java @@ -19,34 +19,17 @@ package org.apache.hyracks.control.nc.task; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.hyracks.control.common.base.IClusterController; -import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.util.ExitUtil; public class ShutdownTask implements Runnable { - private static final Logger LOGGER = Logger.getLogger(ShutdownTask.class.getName()); - private final NodeControllerService ncs; private final boolean terminateNCService; - public ShutdownTask(NodeControllerService ncs, boolean terminateNCService) { - this.ncs = ncs; + public ShutdownTask(boolean terminateNCService) { this.terminateNCService = terminateNCService; } @Override - @SuppressWarnings("squid:S1147") // Runtime.exit() public void run() { - IClusterController ccs = ncs.getClusterController(); - try { - ccs.notifyShutdown(ncs.getId()); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception notifying CC of shutdown acknowledgment", e); - // proceed with shutdown - } - ExitUtil.exit(terminateNCService ? 99 : 0); }
