This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new c6c2d8d Don't return null from Thrift called function, added comments about seen error c6c2d8d is described below commit c6c2d8d32c403d897589c714ab4c4364793c183d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 16 13:22:55 2021 +0000 Don't return null from Thrift called function, added comments about seen error --- .../coordinator/CompactionCoordinator.java | 119 +++++++++++++++++++-- .../org/apache/accumulo/compactor/Compactor.java | 4 + 2 files changed, 114 insertions(+), 9 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index a8e2f23..5ede596 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -244,9 +244,18 @@ public class CompactionCoordinator extends AbstractServer Set<TServerInstance> added) { // run() will iterate over the current and added tservers and add them to the internal - // data structures. For tservers that are deleted, we need to remove them from the - // internal data structures + // data structures. For tservers that are deleted, we need to remove them from QUEUES + // and INDEX and cancel and RUNNING compactions as we currently don't have a way + // to notify a tabletserver that a compaction has completed when the tablet is re-hosted. deleted.forEach(tsi -> { + // Find any running compactions for the tserver + final List<ExternalCompactionId> toCancel = new ArrayList<>(); + RUNNING.forEach((k, v) -> { + if (v.getCompactorAddress().equals(tsi)) { + toCancel.add(k); + } + }); + // Remove the tserver from the QUEUES and INDEX INDEX.get(tsi).forEach(qp -> { TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue()); if (null != m) { @@ -259,6 +268,15 @@ public class CompactionCoordinator extends AbstractServer } } }); + // Cancel running compactions + toCancel.forEach(id -> { + try { + cancelCompaction(id.canonical()); + } catch (TException e) { + LOG.error("Error cancelling running compaction {} due to tserver {} removal.", id, tsi, + e); + } + }); }); } @@ -313,13 +331,9 @@ public class CompactionCoordinator extends AbstractServer } if (null == tserver) { - LOG.debug("No compactions found for queue {}, returning null to compactor {}", queue, + LOG.debug("No compactions found for queue {}, returning empty job to compactor {}", queue, compactorAddress); - // CBUG Returning null here causes: - // [compaction.RetryableThriftCall] ERROR: Error in Thrift function, retrying in 60000ms. - // Error: org.apache.thrift.TApplicationException: getCompactionJob failed: unknown result - - return null; + return new TExternalCompactionJob(); } TabletClientService.Client client = null; @@ -385,7 +399,8 @@ public class CompactionCoordinator extends AbstractServer try { cancelThriftCall.run(); } catch (RetriesExceededException e) { - // TODO: This should not occur as the cancelThriftCall has unlimited retries + LOG.error("Unable to contact Compactor {} to cancel running compaction {}", + rc.getCompactorAddress(), rc.getJob(), e); } } @@ -446,6 +461,64 @@ public class CompactionCoordinator extends AbstractServer } }); try { + // CBUG Saw the following situation in testing: + // 1. Compactor ran a compaction and completed. + // 2. Compactor called this method + // 3. Thrift timeout occurred and the Compactor retried due to RetryableThriftCall + // 4. Upon retry this method returned UnknownCompactionIdException because no entry in RUNNING + + // See Method below where tserver could poll coordinator to see if compaction is completed. + + // "compactor" #38 prio=5 os_prio=0 cpu=157.59ms elapsed=197.99s tid=0x000055ea28438800 + // nid=0x4dae runnable [0x00007fb0c5f2e000] + // java.lang.Thread.State: RUNNABLE + // at sun.nio.ch.EPoll.wait(java.base@11.0.10/Native Method) + // at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.10/EPollSelectorImpl.java:120) + // at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.10/SelectorImpl.java:124) + // - locked <0x00000000f449acc0> (a sun.nio.ch.Util$2) + // - locked <0x00000000f449aa60> (a sun.nio.ch.EPollSelectorImpl) + // at sun.nio.ch.SelectorImpl.select(java.base@11.0.10/SelectorImpl.java:136) + // at + // org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336) + // at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157) + // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) + // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) + // at java.io.FilterInputStream.read(java.base@11.0.10/FilterInputStream.java:133) + // at java.io.BufferedInputStream.fill(java.base@11.0.10/BufferedInputStream.java:252) + // at java.io.BufferedInputStream.read1(java.base@11.0.10/BufferedInputStream.java:292) + // at java.io.BufferedInputStream.read(java.base@11.0.10/BufferedInputStream.java:351) + // - locked <0x00000000f466c028> (a java.io.BufferedInputStream) + // at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) + // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) + // at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:132) + // at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:100) + // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) + // at + // org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:546) + // at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:637) + // at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:505) + // at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) + // at + // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.recv_compactionCompleted(CompactionCoordinator.java:118) + // at + // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.compactionCompleted(CompactionCoordinator.java:104) + // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:296) + // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:291) + // at + // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:102) + // at org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:304) + // at org.apache.accumulo.compactor.Compactor.run(Compactor.java:509) + + // "CompactionCoordinator-ClientPool-Worker-1" #47 daemon prio=5 os_prio=0 cpu=70.37ms + // elapsed=243.97s tid=0x00005590266cb800 nid=0x4db3 waiting on condition [0x00007f4c8cb6c000] + // java.lang.Thread.State: TIMED_WAITING (sleeping) + // at java.lang.Thread.sleep(java.base@11.0.10/Native Method) + // at org.apache.accumulo.fate.util.UtilWaitThread.sleep(UtilWaitThread.java:33) + // at + // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:113) + // at + // org.apache.accumulo.coordinator.CompactionCoordinator.compactionCompleted(CompactionCoordinator.java:462) + completedThriftCall.run(); } catch (RetriesExceededException e) { // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify @@ -467,6 +540,34 @@ public class CompactionCoordinator extends AbstractServer */ } + + } + + /** + * Called by TabletServer to check if an external compaction has been completed. + * + * + * @param externalCompactionId + * @return CompactionStats or null if not completed + * @throws TException + */ + public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException { + var ecid = ExternalCompactionId.of(externalCompactionId); + RunningCompaction rc = RUNNING.get(ecid); + if (null != rc && null != rc.getStats()) { + RUNNING.remove(ecid, rc); + return rc.getStats(); + } else if (rc == null) { + LOG.error( + "isCompactionCompleted called by TServer for {}, but no running compaction for that id.", + externalCompactionId); + throw new UnknownCompactionIdException(); + } else { + LOG.debug("isCompactionCompleted called by TServer for {}, but compaction is not complete.", + externalCompactionId); + // Return empty stats as a marker that it's not done. + return new CompactionStats(); + } } /** diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 503b1c5..f1d2454 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -450,6 +450,10 @@ public class Compactor extends AbstractServer TExternalCompactionJob job; try { job = getNextJob(); + if (!job.isSetExternalCompactionId()) { + LOG.info("No external compactions in queue {}", this.queueName); + continue; + } } catch (RetriesExceededException e2) { LOG.warn("Retries exceeded getting next job. Retrying..."); continue;