Repository: asterixdb Updated Branches: refs/heads/master 15924f3cc -> 028537d1f
[NO ISSUE] += helper to ensure exception handling on cleanup Change-Id: I912ccef43c498e09fc9df4dbd5a3b6961d90aa33 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2395 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/028537d1 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/028537d1 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/028537d1 Branch: refs/heads/master Commit: 028537d1f6b5eb03902ef609e5976804e515730f Parents: 15924f3 Author: Michael Blow <mb...@apache.org> Authored: Thu Feb 15 19:22:14 2018 -0500 Committer: Michael Blow <mb...@apache.org> Committed: Thu Feb 15 18:45:32 2018 -0800 ---------------------------------------------------------------------- .../asterix/metadata/lock/DatasetLock.java | 2 +- asterixdb/asterix-transactions/pom.xml | 4 - .../management/service/logging/LogManager.java | 2 +- .../logging/LogManagerWithReplication.java | 2 +- .../org/apache/hyracks/api/util/InvokeUtil.java | 229 +++++++++++++++++++ .../control/nc/NodeControllerService.java | 2 +- .../org/apache/hyracks/util/InvokeUtil.java | 190 --------------- 7 files changed, 233 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java index b35d02a..56da9d5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -26,7 +26,7 @@ import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.metadata.IMetadataLock; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.util.InvokeUtil; +import org.apache.hyracks.api.util.InvokeUtil; public class DatasetLock implements IMetadataLock { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/asterixdb/asterix-transactions/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml index ad044cb..57332ba 100644 --- a/asterixdb/asterix-transactions/pom.xml +++ b/asterixdb/asterix-transactions/pom.xml @@ -155,9 +155,5 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-util</artifactId> - </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 4b154bd..e3788b7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -55,7 +55,7 @@ import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.util.InvokeUtil; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 0bb8293..a1aec1a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -28,7 +28,7 @@ import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; -import org.apache.hyracks.util.InvokeUtil; +import org.apache.hyracks.api.util.InvokeUtil; public class LogManagerWithReplication extends LogManager { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java new file mode 100644 index 0000000..9b356a0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class InvokeUtil { + + private static final Logger LOGGER = LogManager.getLogger(); + + private InvokeUtil() { + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doUninterruptibly(Interruptible interruptible) { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doUninterruptiblyGet(Interruptible interruptible) { + boolean interrupted = false; + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } + } + return interrupted; + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an + * exception after being previously interrupted, the current thread will be re-interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + boolean success = false; + while (true) { + try { + interruptible.run(); + success = true; + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } finally { + if (!success && interrupted) { + Thread.currentThread().interrupt(); + } + } + } + return interrupted; + } + + public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit, + Callable<Boolean> function) throws IOException { + long endTime = System.nanoTime() + durationUnit.toNanos(duration); + boolean first = true; + while (endTime - System.nanoTime() > 0) { + if (first) { + first = false; + } else { + try { + delayUnit.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + try { + if (function.call()) { + return true; + } + } catch (Exception e) { + // ignore, retry after delay + LOGGER.log(Level.DEBUG, "Ignoring exception on retryLoop attempt, will retry after delay", e); + } + } + return false; + } + + /** + * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or + * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if + * the original operation was interrupted. + */ + public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (ClosedByInterruptException | InterruptedException e) { + LOGGER.error("IO operation Interrupted. Retrying..", e); + interrupted = true; + Thread.interrupted(); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception + public static void tryWithCleanups(ThrowingInterruptible action, ThrowingInterruptible... cleanups) + throws Exception { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (ThrowingInterruptible cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else if (savedT instanceof Exception) { + throw (Exception) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + + @FunctionalInterface + public interface Interruptible { + void run() throws InterruptedException; + } + + @FunctionalInterface + public interface ThrowingInterruptible { + void run() throws Exception; // NOSONAR + } + + @FunctionalInterface + public interface ThrowingIOInterruptible { + void run() throws IOException, InterruptedException; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index b1909dd..0f40b60 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -95,7 +95,7 @@ import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; import org.apache.hyracks.util.ExitUtil; -import org.apache.hyracks.util.InvokeUtil; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.util.PidHelper; import org.apache.hyracks.util.trace.ITracer; import org.apache.hyracks.util.trace.Tracer; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/028537d1/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java deleted file mode 100644 index b8d8ce4..0000000 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.util; - -import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class InvokeUtil { - - private static final Logger LOGGER = LogManager.getLogger(); - - private InvokeUtil() { - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doUninterruptibly(Interruptible interruptible) { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible - * completes, the current thread will be re-interrupted, if the original operation was interrupted. - */ - public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doUninterruptiblyGet(Interruptible interruptible) { - boolean interrupted = false; - while (true) { - try { - interruptible.run(); - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } - } - return interrupted; - } - - /** - * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an - * exception after being previously interrupted, the current thread will be re-interrupted. - * - * @return true if the original operation was interrupted, otherwise false - */ - public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { - boolean interrupted = false; - boolean success = false; - while (true) { - try { - interruptible.run(); - success = true; - break; - } catch (InterruptedException e) { // NOSONAR- contract states caller must handle - interrupted = true; - } finally { - if (!success && interrupted) { - Thread.currentThread().interrupt(); - } - } - } - return interrupted; - } - - public static boolean retryLoop(long duration, TimeUnit durationUnit, long delay, TimeUnit delayUnit, - Callable<Boolean> function) throws IOException { - long endTime = System.nanoTime() + durationUnit.toNanos(duration); - boolean first = true; - while (endTime - System.nanoTime() > 0) { - if (first) { - first = false; - } else { - try { - delayUnit.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - try { - if (function.call()) { - return true; - } - } catch (Exception e) { - // ignore, retry after delay - LOGGER.log(Level.DEBUG, "Ignoring exception on retryLoop attempt, will retry after delay", e); - } - } - return false; - } - - /** - * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or - * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if - * the original operation was interrupted. - */ - public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException { - boolean interrupted = false; - try { - while (true) { - try { - interruptible.run(); - break; - } catch (ClosedByInterruptException | InterruptedException e) { - LOGGER.error("IO operation Interrupted. Retrying..", e); - interrupted = true; - Thread.interrupted(); - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - @FunctionalInterface - public interface Interruptible { - void run() throws InterruptedException; - } - - @FunctionalInterface - public interface ThrowingInterruptible { - void run() throws Exception; // NOSONAR - } - - @FunctionalInterface - public interface ThrowingIOInterruptible { - void run() throws IOException, InterruptedException; - } -}