This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 501efad92b Fixes for process exit, added ExitCodesIT (#5811) 501efad92b is described below commit 501efad92bee10aad2910b0984752879d5e42322 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Aug 27 13:53:29 2025 -0400 Fixes for process exit, added ExitCodesIT (#5811) Various changes to clean up the exit process of the server processes. Added ExitCodesIT to confirm exit codes under different conditions. --- .../accumulo/core/clientImpl/ClientContext.java | 17 +- .../accumulo/core/lock/ServiceLockSupport.java | 10 +- .../java/org/apache/accumulo/core/util/Halt.java | 2 +- .../org/apache/accumulo/server/AbstractServer.java | 53 +-- .../org/apache/accumulo/server/ServerContext.java | 3 + .../accumulo/server/manager/LiveTServerSet.java | 2 +- .../accumulo/server/mem/LowMemoryDetector.java | 2 +- .../org/apache/accumulo/compactor/Compactor.java | 395 ++++++++++----------- .../apache/accumulo/gc/SimpleGarbageCollector.java | 14 +- .../java/org/apache/accumulo/manager/Manager.java | 15 +- .../accumulo/manager/TabletGroupWatcher.java | 4 +- .../org/apache/accumulo/tserver/ScanServer.java | 90 ++--- .../org/apache/accumulo/tserver/TabletServer.java | 14 +- .../accumulo/tserver/log/TabletServerLogger.java | 2 +- .../accumulo/tserver/tablet/MinorCompactor.java | 2 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- .../accumulo/test/functional/ExitCodesIT.java | 334 +++++++++++++++++ 17 files changed, 635 insertions(+), 326 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 79ac3440a4..dc5a03288e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -918,21 +918,26 @@ public class ClientContext implements AccumuloClient { @Override public synchronized void close() { if (closed.compareAndSet(false, true)) { - if (zooCacheCreated.get()) { - zooCache.get().close(); - } - if (zooKeeperOpened.get()) { - zooSession.get().close(); - } if (thriftTransportPool != null) { + log.debug("Closing Thrift Transport Pool"); thriftTransportPool.shutdown(); } if (scannerReadaheadPool != null) { + log.debug("Closing Scanner ReadAhead Pool"); scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down } if (cleanupThreadPool != null) { + log.debug("Closing Cleanup ThreadPool"); cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute } + if (zooCacheCreated.get()) { + log.debug("Closing ZooCache"); + zooCache.get().close(); + } + if (zooKeeperOpened.get()) { + log.debug("Closing ZooSession"); + zooSession.get().close(); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index 19be1b6f4f..fad535fefa 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@ -82,13 +82,13 @@ public class ServiceLockSupport { Halt.halt(0, server + " lock in zookeeper lost (reason = " + reason + "), exiting cleanly because shutdown is complete."); } else { - Halt.halt(-1, server + " lock in zookeeper lost (reason = " + reason + "), exiting!"); + Halt.halt(1, server + " lock in zookeeper lost (reason = " + reason + "), exiting!"); } } @Override public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, "FATAL: No longer able to monitor " + server + " lock node", e); + Halt.halt(1, "FATAL: No longer able to monitor " + server + " lock node", e); } @Override @@ -96,7 +96,7 @@ public class ServiceLockSupport { LOG.debug("Acquired {} lock", server); if (acquiredLock || failedToAcquireLock) { - Halt.halt(-1, "Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock); + Halt.halt(1, "Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock); } acquiredLock = true; @@ -111,11 +111,11 @@ public class ServiceLockSupport { String msg = "Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication."; LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(-1, msg); + Halt.halt(1, msg); } if (acquiredLock) { - Halt.halt(-1, + Halt.halt(1, "Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/Halt.java b/core/src/main/java/org/apache/accumulo/core/util/Halt.java index 48e82deb9b..bea10e9b12 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Halt.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Halt.java @@ -40,7 +40,7 @@ public class Halt { halt(status, msg, null, runnable); } - public static void halt(final int status, final String msg, final Throwable exception, + private static void halt(final int status, final String msg, final Throwable exception, final Runnable runnable) { try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index e6543afe17..0b94dc1203 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -76,24 +76,7 @@ public abstract class AbstractServer } public static void startServer(AbstractServer server, Logger LOG) throws Exception { - try { - server.runServer(); - } catch (Throwable e) { - System.err - .println(server.getClass().getSimpleName() + " died, exception thrown from runServer."); - e.printStackTrace(); - LOG.error("{} died, exception thrown from runServer.", server.getClass().getSimpleName(), e); - throw e; - } finally { - try { - server.close(); - } catch (Throwable e) { - System.err.println("Exception thrown while closing " + server.getClass().getSimpleName()); - e.printStackTrace(); - LOG.error("Exception thrown while closing {}", server.getClass().getSimpleName(), e); - throw e; - } - } + server.runServer(); } private final MetricSource metricSource; @@ -112,6 +95,7 @@ public abstract class AbstractServer private volatile Thread verificationThread; private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private final AtomicBoolean shutdownComplete = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); protected AbstractServer(ServerId.Type serverType, ConfigOpts opts, Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { @@ -203,7 +187,8 @@ public abstract class AbstractServer * * @param isIdle whether the server is idle */ - protected void updateIdleStatus(boolean isIdle) { + // public for ExitCodesIT + public void updateIdleStatus(boolean isIdle) { boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodMillis == 0; boolean hasIdlePeriodStarted = idlePeriodTimer != null; boolean hasExceededIdlePeriod = @@ -290,7 +275,10 @@ public abstract class AbstractServer */ public void runServer() throws Exception { final AtomicReference<Throwable> err = new AtomicReference<>(); - serverThread = new Thread(TraceUtil.wrap(this), applicationName); + serverThread = new Thread(TraceUtil.wrap(() -> { + this.run(); + close(); + }), applicationName); serverThread.setUncaughtExceptionHandler((thread, exception) -> err.set(exception)); serverThread.start(); serverThread.join(); @@ -331,7 +319,8 @@ public abstract class AbstractServer return bindAddress; } - protected TServer getThriftServer() { + // public for ExitCodesIT + public TServer getThriftServer() { if (thriftServer == null) { return null; } @@ -451,7 +440,7 @@ public abstract class AbstractServer log.trace( "ServiceLockVerificationThread - checking ServiceLock existence in ZooKeeper"); if (lock != null && !lock.verifyLockAtSource()) { - Halt.halt(-1, "Lock verification thread could not find lock"); + Halt.halt(1, "Lock verification thread could not find lock"); } // Need to sleep, not yield when the thread priority is greater than NORM_PRIORITY // so that this thread does not get immediately rescheduled. @@ -476,8 +465,21 @@ public abstract class AbstractServer @Override public void close() { - if (context != null) { - context.close(); + + if (closed.compareAndSet(false, true)) { + + // Must set shutdown as completed before calling ServerContext.close(). + // ServerContext.close() calls ClientContext.close() -> + // ZooSession.close() which removes all of the ephemeral nodes and + // forces the watches to fire. The ServiceLockWatcher has a reference + // to shutdownComplete and will terminate the JVM with a 0 exit code + // if true. Otherwise it will exit with a non-zero exit code. + getShutdownComplete().set(true); + + if (context != null) { + context.getLowMemoryDetector().logGCInfo(getConfiguration()); + context.close(); + } } } @@ -488,4 +490,7 @@ public abstract class AbstractServer } } + public void requestShutdownForTests() { + shutdownRequested.set(true); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index 4363648ff5..a6919bed3e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -522,9 +522,11 @@ public class ServerContext extends ClientContext { getMetricsInfo().close(); } if (sharedSchedExecutorCreated.get()) { + log.debug("Shutting down shared executor pool"); getScheduledExecutor().shutdownNow(); } if (sharedMetadataWriterCreated.get()) { + log.debug("Shutting down shared metadata conditional writer"); try { ConditionalWriter writer = sharedMetadataWriter.get(); if (writer != null) { @@ -535,6 +537,7 @@ public class ServerContext extends ClientContext { } } if (sharedUserWriterCreated.get()) { + log.debug("Shutting down shared user conditional writer"); try { ConditionalWriter writer = sharedUserWriter.get(); if (writer != null) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index a178f0b44b..d367f356eb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -519,7 +519,7 @@ public class LiveTServerSet implements ZooCacheWatcher { try { context.getZooSession().asReaderWriter().recursiveDelete(slp.toString(), SKIP); } catch (Exception e) { - Halt.halt(-1, "error removing tablet server lock", e); + Halt.halt(1, "error removing tablet server lock", e); } context.getZooCache().clear(slp.toString()); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index 8d745e7774..ca320b3cd1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -193,7 +193,7 @@ public class LowMemoryDetector { } if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt(-1, "Garbage collection may be interfering with lock keep-alive. Halting."); + Halt.halt(1, "Garbage collection may be interfering with lock keep-alive. Halting."); } localState.lastMemorySize = freeMemory; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 64bf2b0200..9772d4bd9f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -829,237 +829,236 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); LOG.info("Compactor started, waiting for work"); - try { - - final AtomicReference<Throwable> err = new AtomicReference<>(); - final LogSorter logSorter = new LogSorter(this); - long nextSortLogsCheckTime = System.currentTimeMillis(); - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + final AtomicReference<Throwable> err = new AtomicReference<>(); + final LogSorter logSorter = new LogSorter(this); + long nextSortLogsCheckTime = System.currentTimeMillis(); - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); - - if (System.currentTimeMillis() > nextSortLogsCheckTime) { - // Attempt to process all existing log sorting work serially in this thread. - // When no work remains, this call will return so that we can look for compaction - // work. - LOG.debug("Checking to see if any recovery logs need sorting"); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); + + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); + + if (System.currentTimeMillis() > nextSortLogsCheckTime) { + // Attempt to process all existing log sorting work serially in this thread. + // When no work remains, this call will return so that we can look for compaction + // work. + LOG.debug("Checking to see if any recovery logs need sorting"); + try { nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + } catch (KeeperException e) { + LOG.error("Error sorting logs", e); } + } - performFailureProcessing(errorHistory); + performFailureProcessing(errorHistory); - TExternalCompactionJob job; - try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.getResourceGroup()); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); - continue; - } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); - } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { + LOG.trace("No external compactions in queue {}", this.getResourceGroup()); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); continue; } - LOG.debug("Received next compaction job: {}", job); + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); + continue; + } + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = Threads.createNonCriticalThread( - "Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = Threads.createNonCriticalThread( + "Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + try { // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List<CompactionInfo> running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); - } - } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); - } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); + } catch (RetriesExceededException e) { + LOG.error( + "Error starting FileCompactableRunnable, cancelling compaction and moving to next job.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); } + continue; + } - if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() - || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job, InterruptedException.class.getName()); - cancelled.incrementAndGet(); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } finally { - currentCompactionId.set(null); - } - } else if (err.get() != null) { - final KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - try { - LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job, err.get().getClass().getName()); - failed.incrementAndGet(); - errorHistory.addError(fromThriftExtent.tableId(), err.get()); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent, e); - } finally { - currentCompactionId.set(null); - } - } else { - try { - LOG.trace("Updating coordinator with compaction completion."); - updateCompactionCompleted(job, JOB_HOLDER.getStats()); - completed.incrementAndGet(); - // job completed successfully, clear the error history - errorHistory.clear(); - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", - e); + compactionThread.start(); // start the compactionThread + started.await(); // wait until the compactor is started + final long inputEntries = totalInputEntries.sum(); + final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); + LOG.debug("Progress checks will occur every {} seconds", waitTime); + String percentComplete = "unknown"; + + while (!stopped.await(waitTime, TimeUnit.SECONDS)) { + List<CompactionInfo> running = + org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); + if (inputEntries > 0) { + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); + } + String message = String.format( + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", + entriesRead, inputEntries, percentComplete, "%", entriesWritten); + watcher.run(); try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + LOG.debug("Updating coordinator with compaction progress: {}.", message); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + } catch (RetriesExceededException e) { + LOG.warn("Error updating coordinator with compaction progress, error: {}", + e.getMessage()); } - } finally { - currentCompactionId.set(null); } + } else { + LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } catch (RuntimeException e1) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to start, cancelling job", - e1); + } + compactionThread.join(); + LOG.trace("Compaction thread finished."); + // Run the watcher again to clear out the finished compaction and set the + // stuck count to zero. + watcher.run(); + + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); + } + + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() + || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); try { - cancel(job.getExternalCompactionId()); - } catch (TException e2) { - LOG.error("Error cancelling compaction.", e2); + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job, InterruptedException.class.getName()); + cancelled.incrementAndGet(); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } finally { + currentCompactionId.set(null); } - } finally { - currentCompactionId.set(null); + } else if (err.get() != null) { + final KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + try { + LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, + "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, + fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job, err.get().getClass().getName()); + failed.incrementAndGet(); + errorHistory.addError(fromThriftExtent.tableId(), err.get()); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent, e); + } finally { + currentCompactionId.set(null); + } + } else { + try { + LOG.trace("Updating coordinator with compaction completion."); + updateCompactionCompleted(job, JOB_HOLDER.getStats()); + completed.incrementAndGet(); + // job completed successfully, clear the error history + errorHistory.clear(); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } + } finally { + currentCompactionId.set(null); + } + } + } catch (RuntimeException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e2) { + LOG.error("Error cancelling compaction.", e2); + } + } finally { + currentCompactionId.set(null); - // mark compactor as idle after compaction completes - updateIdleStatus(true); + // mark compactor as idle after compaction completes + updateIdleStatus(true); - // In the case where there is an error in the foreground code the background compaction - // may still be running. Must cancel it before starting another iteration of the loop to - // avoid multiple threads updating shared state. - while (compactionThread.isAlive()) { - compactionThread.interrupt(); - compactionThread.join(1000); - } + // In the case where there is an error in the foreground code the background compaction + // may still be running. Must cancel it before starting another iteration of the loop to + // avoid multiple threads updating shared state. + while (compactionThread.isAlive()) { + compactionThread.interrupt(); + compactionThread.join(1000); } - } catch (InterruptedException e) { - LOG.info("Interrupt Exception received, shutting down"); - gracefulShutdown(getContext().rpcCreds()); } - } // end while - } catch (Exception e) { - LOG.error("Unhandled error occurred in Compactor", e); - } finally { - // Shutdown local thrift server - LOG.debug("Stopping Thrift Servers"); - if (getThriftServer() != null) { - getThriftServer().stop(); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } + } // end while, shutdown requested - try { - LOG.debug("Closing filesystems"); - VolumeManager mgr = getContext().getVolumeManager(); - if (null != mgr) { - mgr.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); - } + // Shutdown local thrift server + LOG.debug("Stopping Thrift Servers"); + if (getThriftServer() != null) { + getThriftServer().stop(); + } - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - super.close(); - getShutdownComplete().set(true); - LOG.info("stop requested. exiting ... "); - try { - if (null != compactorLock) { - compactorLock.unlock(); - } - } catch (Exception e) { - LOG.warn("Failed to release compactor lock", e); + try { + LOG.debug("Closing filesystems"); + VolumeManager mgr = getContext().getVolumeManager(); + if (null != mgr) { + mgr.close(); } + } catch (IOException e) { + LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); } - } public static void main(String[] args) throws Exception { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 25383876c5..5b09324d4a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -93,7 +93,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { private final Timer lastCompactorCheck = Timer.startNew(); - SimpleGarbageCollector(ConfigOpts opts, String[] args) { + protected SimpleGarbageCollector(ConfigOpts opts, String[] args) { super(ServerId.Type.GARBAGE_COLLECTOR, opts, ServerContext::new, args); final AccumuloConfiguration conf = getConfiguration(); @@ -352,15 +352,6 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { gracefulShutdown(getContext().rpcCreds()); } } - super.close(); - getShutdownComplete().set(true); - log.info("stop requested. exiting ... "); - try { - gcLock.unlock(); - } catch (Exception e) { - log.warn("Failed to release GarbageCollector lock", e); - } - } private void incrementStatsForRun(GCRun gcRun) { @@ -370,7 +361,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { status.current.errors += gcRun.getErrorsStat(); } - private void logStats() { + // public for ExitCodesIT + public void logStats() { log.info("Number of data file candidates for deletion: {}", status.current.candidates); log.info("Number of data file candidates still in use: {}", status.current.inUse); log.info("Number of successfully deleted data files: {}", status.current.deleted); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 7805cc85ab..cb50ebae84 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1206,7 +1206,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { break; } try { - Thread.sleep(500); + mainWait(); } catch (InterruptedException e) { log.info("Interrupt Exception received, shutting down"); gracefulShutdown(context.rpcCreds()); @@ -1257,14 +1257,11 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { throw new IllegalStateException("Exception waiting on watcher", e); } } - super.close(); - getShutdownComplete().set(true); - log.info("stop requested. exiting ... "); - try { - managerLock.unlock(); - } catch (Exception e) { - log.warn("Failed to release Manager lock", e); - } + } + + // method exists for ExitCodesIT + public void mainWait() throws InterruptedException { + Thread.sleep(500); } protected Fate<Manager> initializeFateInstance(ServerContext context, FateStore<Manager> store) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 0658262f56..975ac8f245 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -478,7 +478,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { Set<TServerInstance> filteredServersToShutdown = new HashSet<>(tableMgmtParams.getServersToShutdown()); - while (iter.hasNext()) { + while (iter.hasNext() && !manager.isShutdownRequested()) { final TabletManagement mti = iter.next(); if (mti == null) { throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); @@ -728,7 +728,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { int[] oldCounts = new int[TabletState.values().length]; boolean lookForTabletsNeedingVolReplacement = true; - while (manager.stillManager()) { + while (manager.stillManager() && !manager.isShutdownRequested()) { if (!eventHandler.isNeedsFullScan()) { // If an event handled by the EventHandler.RangeProcessor indicated // that we need to do a full scan, then do it. Otherwise wait a bit diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index a5562e44bf..f218f8f71f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -400,67 +400,51 @@ public class ScanServer extends AbstractServer "Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1."); } - try { - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - Thread.sleep(1000); - updateIdleStatus(sessionManager.getActiveScans().isEmpty() - && tabletMetadataCache.estimatedSize() == 0); - } catch (InterruptedException e) { - LOG.info("Interrupt Exception received, shutting down"); - gracefulShutdown(getContext().rpcCreds()); - } - } - } finally { - // Wait for scans to got to zero - while (!sessionManager.getActiveScans().isEmpty()) { - LOG.debug("Waiting on {} active scans to complete.", - sessionManager.getActiveScans().size()); - UtilWaitThread.sleep(1000); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; } - - LOG.debug("Stopping Thrift Servers"); - getThriftServer().stop(); - try { - LOG.info("Removing server scan references"); - this.getContext().getAmple().scanServerRefs().delete(getAdvertiseAddress().toString(), - serverLockUUID); - } catch (Exception e) { - LOG.warn("Failed to remove scan server refs from metadata location", e); + Thread.sleep(1000); + updateIdleStatus( + sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } + } - try { - LOG.debug("Closing filesystems"); - VolumeManager mgr = getContext().getVolumeManager(); - if (null != mgr) { - mgr.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); - } + // Wait for scans to get to zero + while (!sessionManager.getActiveScans().isEmpty()) { + LOG.debug("Waiting on {} active scans to complete.", sessionManager.getActiveScans().size()); + UtilWaitThread.sleep(1000); + } - if (tmCacheExecutor != null) { - LOG.debug("Shutting down TabletMetadataCache executor"); - tmCacheExecutor.shutdownNow(); - } + LOG.debug("Stopping Thrift Servers"); + getThriftServer().stop(); - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - super.close(); - getShutdownComplete().set(true); - LOG.info("stop requested. exiting ... "); - try { - if (null != lock) { - lock.unlock(); - } - } catch (Exception e) { - LOG.warn("Failed to release scan server lock", e); + try { + LOG.info("Removing server scan references"); + this.getContext().getAmple().scanServerRefs().delete(getAdvertiseAddress().toString(), + serverLockUUID); + } catch (Exception e) { + LOG.warn("Failed to remove scan server refs from metadata location", e); + } + + try { + LOG.debug("Closing filesystems"); + VolumeManager mgr = getContext().getVolumeManager(); + if (null != mgr) { + mgr.close(); } + } catch (IOException e) { + LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); + } + if (tmCacheExecutor != null) { + LOG.debug("Shutting down TabletMetadataCache executor"); + tmCacheExecutor.shutdownNow(); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index bee188443e..1861389feb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -671,7 +671,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer try { // Ask the manager to unload our tablets and stop loading new tablets if (iface == null) { - Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!"); + Halt.halt(1, "Error informing Manager that we are shutting down, exiting!"); } else { iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), getTabletSession().getHostPortSession(), getResourceGroup().canonical()); @@ -690,7 +690,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer sendManagerMessages(managerDown, iface, advertiseAddressString); } catch (TException | RuntimeException e) { - Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!", e); + Halt.halt(1, "Error informing Manager that we are shutting down, exiting!", e); } finally { returnManagerConnection(iface); } @@ -706,16 +706,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer } catch (IOException e) { log.warn("Failed to close filesystem : {}", e.getMessage(), e); } - - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - super.close(); - getShutdownComplete().set(true); - log.info("TServerInfo: stop requested. exiting ... "); - try { - tabletServerLock.unlock(); - } catch (Exception e) { - log.warn("Failed to release tablet server lock", e); - } } private boolean sendManagerMessages(boolean managerDown, ManagerClientService.Client iface, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 1ae26c5a05..559cc8a487 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -455,7 +455,7 @@ public class TabletServerLogger { if (sawWriteFailure != null) { log.info("WAL write failure, validating server lock in ZooKeeper", sawWriteFailure); if (tabletServerLock == null || !tabletServerLock.verifyLockAtSource()) { - Halt.halt(-1, "Writing to WAL has failed and TabletServer lock does not exist", + Halt.halt(1, "Writing to WAL has failed and TabletServer lock does not exist", sawWriteFailure); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index e091ecb401..113aa1928d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -99,7 +99,7 @@ public class MinorCompactor extends FileCompactor { if (tserverLock == null || !tserverLock.verifyLockAtSource()) { log.error("Minor compaction of {} has failed and TabletServer lock does not exist." + " Halting...", getExtent(), e); - Halt.halt(-1, "TabletServer lock does not exist", e); + Halt.halt(1, "TabletServer lock does not exist", e); } else { throw e; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index c2170a42d4..1c96bddbe7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -414,7 +414,7 @@ public class Tablet extends TabletBase { if (tserverLock == null || !tserverLock.verifyLockAtSource()) { log.error("Minor compaction of {} has failed and TabletServer lock does not exist." + " Halting...", getExtent(), e); - Halt.halt(-1, "TabletServer lock does not exist", e); + Halt.halt(1, "TabletServer lock does not exist", e); } else { TraceUtil.setException(span2, e, true); throw e; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java new file mode 100644 index 0000000000..e37ef29975 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT extends SharedMiniClusterBase { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static TerminalBehavior getTerminalBehavior() { + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + return TerminalBehavior.valueOf(methodBehavior); + } + + public static class ExitCompactor extends Compactor { + + public static void main(String[] args) throws Exception { + List<String> compactorArgs = new ArrayList<>(); + compactorArgs.add("-o"); + compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST"); + AbstractServer.startServer(new ExitCompactor(getTerminalBehavior(), compactorArgs), + LoggerFactory.getLogger(ExitCompactor.class)); + } + + private final TerminalBehavior behavior; + + public ExitCompactor(TerminalBehavior behavior, List<String> compactorArgs) { + super(new ConfigOpts(), compactorArgs.toArray(new String[] {})); + this.behavior = behavior; + } + + @Override + public void updateIdleStatus(boolean isIdle) { + switch (behavior) { + case ERROR: + throw new StackOverflowError("throwing stack overflow error"); + case EXCEPTION: + throw new RuntimeException("throwing runtime exception"); + case SHUTDOWN: + requestShutdownForTests(); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + } + } + + public static class ExitScanServer extends ScanServer { + + public static void main(String[] args) throws Exception { + List<String> sserverArgs = new ArrayList<>(); + sserverArgs.add("-o"); + sserverArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST"); + AbstractServer.startServer(new ExitScanServer(getTerminalBehavior(), sserverArgs), + LoggerFactory.getLogger(ExitScanServer.class)); + } + + private final TerminalBehavior behavior; + + public ExitScanServer(TerminalBehavior behavior, List<String> sserverArgs) { + super(new ConfigOpts(), sserverArgs.toArray(new String[] {})); + this.behavior = behavior; + } + + @Override + public void updateIdleStatus(boolean isIdle) { + switch (behavior) { + case ERROR: + throw new StackOverflowError("throwing stack overflow error"); + case EXCEPTION: + throw new RuntimeException("throwing runtime exception"); + case SHUTDOWN: + requestShutdownForTests(); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + } + } + + public static class ExitTabletServer extends TabletServer { + + public static void main(String[] args) throws Exception { + List<String> tserverArgs = new ArrayList<>(); + tserverArgs.add("-o"); + tserverArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST"); + AbstractServer.startServer(new ExitTabletServer(getTerminalBehavior(), tserverArgs), + LoggerFactory.getLogger(ExitTabletServer.class)); + } + + private final TerminalBehavior behavior; + + public ExitTabletServer(TerminalBehavior behavior, List<String> tserverArgs) { + super(new ConfigOpts(), ServerContext::new, tserverArgs.toArray(new String[] {})); + this.behavior = behavior; + } + + @Override + public void updateIdleStatus(boolean isIdle) { + switch (behavior) { + case ERROR: + throw new StackOverflowError("throwing stack overflow error"); + case EXCEPTION: + throw new RuntimeException("throwing runtime exception"); + case SHUTDOWN: + requestShutdownForTests(); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + } + } + + public static class ExitGC extends SimpleGarbageCollector { + + public static void main(String[] args) throws Exception { + AbstractServer.startServer(new ExitGC(getTerminalBehavior()), + LoggerFactory.getLogger(ExitGC.class)); + } + + private final TerminalBehavior behavior; + + public ExitGC(TerminalBehavior behavior) { + super(new ConfigOpts(), new String[] {}); + this.behavior = behavior; + } + + @Override + public void logStats() { + switch (behavior) { + case ERROR: + throw new StackOverflowError("throwing stack overflow error"); + case EXCEPTION: + throw new RuntimeException("throwing runtime exception"); + case SHUTDOWN: + requestShutdownForTests(); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + } + } + + public static class ExitManager extends Manager { + + public static void main(String[] args) throws Exception { + AbstractServer.startServer(new ExitManager(getTerminalBehavior()), + LoggerFactory.getLogger(ExitManager.class)); + } + + private final TerminalBehavior behavior; + + public ExitManager(TerminalBehavior behavior) throws IOException { + super(new ConfigOpts(), ServerContext::new, new String[] {}); + this.behavior = behavior; + } + + @Override + public void mainWait() throws InterruptedException { + switch (behavior) { + case ERROR: + throw new StackOverflowError("throwing stack overflow error"); + case EXCEPTION: + throw new RuntimeException("throwing runtime exception"); + case SHUTDOWN: + requestShutdownForTests(); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + } + } + + @BeforeAll + public static void beforeTests() throws Exception { + // Start MiniCluster so that getCluster() does not + // return null, + SharedMiniClusterBase.startMiniCluster(); + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + } + + @AfterAll + public static void afterTests() { + SharedMiniClusterBase.stopMiniCluster(); + } + + // Junit doesn't support more than one parameterized + // argument to a test method. We need to generate + // the arguments here. + static Stream<Arguments> generateWorkerProcessArguments() { + List<Arguments> args = new ArrayList<>(); + for (ServerType st : ServerType.values()) { + if (st == ServerType.COMPACTOR || st == ServerType.SCAN_SERVER + || st == ServerType.TABLET_SERVER) { + for (TerminalBehavior tb : TerminalBehavior.values()) { + args.add(Arguments.of(st, tb)); + } + } + } + return args.stream(); + } + + @ParameterizedTest + @MethodSource("generateWorkerProcessArguments") + public void testWorkerProcesses(ServerType server, TerminalBehavior behavior) throws Exception { + Map<String,String> properties = new HashMap<>(); + properties.put(PROXY_METHOD_BEHAVIOR, behavior.name()); + getCluster().getConfig().setSystemProperties(properties); + Class<?> serverClass = null; + switch (server) { + case COMPACTOR: + serverClass = ExitCompactor.class; + break; + case SCAN_SERVER: + serverClass = ExitScanServer.class; + break; + case TABLET_SERVER: + serverClass = ExitTabletServer.class; + break; + case GARBAGE_COLLECTOR: + case MANAGER: + case MONITOR: + case ZOOKEEPER: + default: + throw new IllegalArgumentException("Unhandled type"); + } + ProcessInfo pi = getCluster()._exec(serverClass, server, Map.of(), new String[] {}); + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + assertEquals(behavior == TerminalBehavior.SHUTDOWN ? 0 : 1, exitValue); + } + + @ParameterizedTest + @EnumSource + public void testGarbageCollector(TerminalBehavior behavior) throws Exception { + Map<String,String> properties = new HashMap<>(); + properties.put(PROXY_METHOD_BEHAVIOR, behavior.name()); + getCluster().getConfig().setSystemProperties(properties); + ProcessInfo pi = + getCluster()._exec(ExitGC.class, ServerType.GARBAGE_COLLECTOR, Map.of(), new String[] {}); + if (behavior == TerminalBehavior.SHUTDOWN) { + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + assertEquals(0, exitValue); + } else if (behavior == TerminalBehavior.EXCEPTION) { + // GarbageCollector logs exceptions and keeps going. + // We need to let this time out and then + // terminate the process. + IllegalStateException ise = assertThrows(IllegalStateException.class, + () -> Wait.waitFor(() -> !pi.getProcess().isAlive(), 60_000)); + assertTrue(ise.getMessage().contains("Timeout exceeded")); + pi.getProcess().destroyForcibly(); + } else { + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + assertEquals(1, exitValue); + } + } + + @ParameterizedTest + @EnumSource + public void testManager(TerminalBehavior behavior) throws Exception { + try { + getCluster().getClusterControl().stop(ServerType.MANAGER); + Map<String,String> properties = new HashMap<>(); + properties.put(PROXY_METHOD_BEHAVIOR, behavior.name()); + getCluster().getConfig().setSystemProperties(properties); + ProcessInfo pi = + getCluster()._exec(ExitManager.class, ServerType.MANAGER, Map.of(), new String[] {}); + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + assertEquals(behavior == TerminalBehavior.SHUTDOWN ? 0 : 1, exitValue); + } finally { + getCluster().getClusterControl().stop(ServerType.MANAGER); + } + } + +}