This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new c6c2d8d  Don't return null from Thrift called function, added comments 
about seen error
c6c2d8d is described below

commit c6c2d8d32c403d897589c714ab4c4364793c183d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Mar 16 13:22:55 2021 +0000

    Don't return null from Thrift called function, added comments about seen 
error
---
 .../coordinator/CompactionCoordinator.java         | 119 +++++++++++++++++++--
 .../org/apache/accumulo/compactor/Compactor.java   |   4 +
 2 files changed, 114 insertions(+), 9 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index a8e2f23..5ede596 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -244,9 +244,18 @@ public class CompactionCoordinator extends AbstractServer
       Set<TServerInstance> added) {
 
     // run() will iterate over the current and added tservers and add them to 
the internal
-    // data structures. For tservers that are deleted, we need to remove them 
from the
-    // internal data structures
+    // data structures. For tservers that are deleted, we need to remove them 
from QUEUES
+    // and INDEX and cancel and RUNNING compactions as we currently don't have 
a way
+    // to notify a tabletserver that a compaction has completed when the 
tablet is re-hosted.
     deleted.forEach(tsi -> {
+      // Find any running compactions for the tserver
+      final List<ExternalCompactionId> toCancel = new ArrayList<>();
+      RUNNING.forEach((k, v) -> {
+        if (v.getCompactorAddress().equals(tsi)) {
+          toCancel.add(k);
+        }
+      });
+      // Remove the tserver from the QUEUES and INDEX
       INDEX.get(tsi).forEach(qp -> {
         TreeMap<Long,LinkedHashSet<TServerInstance>> m = 
QUEUES.get(qp.getQueue());
         if (null != m) {
@@ -259,6 +268,15 @@ public class CompactionCoordinator extends AbstractServer
           }
         }
       });
+      // Cancel running compactions
+      toCancel.forEach(id -> {
+        try {
+          cancelCompaction(id.canonical());
+        } catch (TException e) {
+          LOG.error("Error cancelling running compaction {} due to tserver {} 
removal.", id, tsi,
+              e);
+        }
+      });
     });
   }
 
@@ -313,13 +331,9 @@ public class CompactionCoordinator extends AbstractServer
     }
 
     if (null == tserver) {
-      LOG.debug("No compactions found for queue {}, returning null to 
compactor {}", queue,
+      LOG.debug("No compactions found for queue {}, returning empty job to 
compactor {}", queue,
           compactorAddress);
-      // CBUG Returning null here causes:
-      // [compaction.RetryableThriftCall] ERROR: Error in Thrift function, 
retrying in 60000ms.
-      // Error: org.apache.thrift.TApplicationException: getCompactionJob 
failed: unknown result
-
-      return null;
+      return new TExternalCompactionJob();
     }
 
     TabletClientService.Client client = null;
@@ -385,7 +399,8 @@ public class CompactionCoordinator extends AbstractServer
     try {
       cancelThriftCall.run();
     } catch (RetriesExceededException e) {
-      // TODO: This should not occur as the cancelThriftCall has unlimited 
retries
+      LOG.error("Unable to contact Compactor {} to cancel running compaction 
{}",
+          rc.getCompactorAddress(), rc.getJob(), e);
     }
   }
 
@@ -446,6 +461,64 @@ public class CompactionCoordinator extends AbstractServer
           }
         });
     try {
+      // CBUG Saw the following situation in testing:
+      // 1. Compactor ran a compaction and completed.
+      // 2. Compactor called this method
+      // 3. Thrift timeout occurred and the Compactor retried due to 
RetryableThriftCall
+      // 4. Upon retry this method returned UnknownCompactionIdException 
because no entry in RUNNING
+
+      // See Method below where tserver could poll coordinator to see if 
compaction is completed.
+
+      // "compactor" #38 prio=5 os_prio=0 cpu=157.59ms elapsed=197.99s 
tid=0x000055ea28438800
+      // nid=0x4dae runnable [0x00007fb0c5f2e000]
+      // java.lang.Thread.State: RUNNABLE
+      // at sun.nio.ch.EPoll.wait(java.base@11.0.10/Native Method)
+      // at 
sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.10/EPollSelectorImpl.java:120)
+      // at 
sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.10/SelectorImpl.java:124)
+      // - locked <0x00000000f449acc0> (a sun.nio.ch.Util$2)
+      // - locked <0x00000000f449aa60> (a sun.nio.ch.EPollSelectorImpl)
+      // at 
sun.nio.ch.SelectorImpl.select(java.base@11.0.10/SelectorImpl.java:136)
+      // at
+      // 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
+      // at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
+      // at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
+      // at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
+      // at 
java.io.FilterInputStream.read(java.base@11.0.10/FilterInputStream.java:133)
+      // at 
java.io.BufferedInputStream.fill(java.base@11.0.10/BufferedInputStream.java:252)
+      // at 
java.io.BufferedInputStream.read1(java.base@11.0.10/BufferedInputStream.java:292)
+      // at 
java.io.BufferedInputStream.read(java.base@11.0.10/BufferedInputStream.java:351)
+      // - locked <0x00000000f466c028> (a java.io.BufferedInputStream)
+      // at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
+      // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
+      // at 
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:132)
+      // at 
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:100)
+      // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
+      // at
+      // 
org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:546)
+      // at 
org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:637)
+      // at 
org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:505)
+      // at 
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
+      // at
+      // 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.recv_compactionCompleted(CompactionCoordinator.java:118)
+      // at
+      // 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.compactionCompleted(CompactionCoordinator.java:104)
+      // at 
org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:296)
+      // at 
org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:291)
+      // at
+      // 
org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:102)
+      // at 
org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:304)
+      // at org.apache.accumulo.compactor.Compactor.run(Compactor.java:509)
+
+      // "CompactionCoordinator-ClientPool-Worker-1" #47 daemon prio=5 
os_prio=0 cpu=70.37ms
+      // elapsed=243.97s tid=0x00005590266cb800 nid=0x4db3 waiting on 
condition [0x00007f4c8cb6c000]
+      // java.lang.Thread.State: TIMED_WAITING (sleeping)
+      // at java.lang.Thread.sleep(java.base@11.0.10/Native Method)
+      // at 
org.apache.accumulo.fate.util.UtilWaitThread.sleep(UtilWaitThread.java:33)
+      // at
+      // 
org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:113)
+      // at
+      // 
org.apache.accumulo.coordinator.CompactionCoordinator.compactionCompleted(CompactionCoordinator.java:462)
+
       completedThriftCall.run();
     } catch (RetriesExceededException e) {
       // TODO: What happens if tserver is no longer hosting tablet? I wonder 
if we should not notify
@@ -467,6 +540,34 @@ public class CompactionCoordinator extends AbstractServer
        */
 
     }
+
+  }
+
+  /**
+   * Called by TabletServer to check if an external compaction has been 
completed.
+   *
+   *
+   * @param externalCompactionId
+   * @return CompactionStats or null if not completed
+   * @throws TException
+   */
+  public CompactionStats isCompactionCompleted(String externalCompactionId) 
throws TException {
+    var ecid = ExternalCompactionId.of(externalCompactionId);
+    RunningCompaction rc = RUNNING.get(ecid);
+    if (null != rc && null != rc.getStats()) {
+      RUNNING.remove(ecid, rc);
+      return rc.getStats();
+    } else if (rc == null) {
+      LOG.error(
+          "isCompactionCompleted called by TServer for {}, but no running 
compaction for that id.",
+          externalCompactionId);
+      throw new UnknownCompactionIdException();
+    } else {
+      LOG.debug("isCompactionCompleted called by TServer for {}, but 
compaction is not complete.",
+          externalCompactionId);
+      // Return empty stats as a marker that it's not done.
+      return new CompactionStats();
+    }
   }
 
   /**
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 503b1c5..f1d2454 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -450,6 +450,10 @@ public class Compactor extends AbstractServer
         TExternalCompactionJob job;
         try {
           job = getNextJob();
+          if (!job.isSetExternalCompactionId()) {
+            LOG.info("No external compactions in queue {}", this.queueName);
+            continue;
+          }
         } catch (RetriesExceededException e2) {
           LOG.warn("Retries exceeded getting next job. Retrying...");
           continue;

Reply via email to