This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 5e4e30b68b fixes #3045 remove stale compactions from coordinator 
(#3059)
5e4e30b68b is described below

commit 5e4e30b68bf14b82b48442cab33d1bd58b943a89
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Nov 2 16:55:55 2022 +0000

    fixes #3045 remove stale compactions from coordinator (#3059)
---
 pom.xml                                            |  2 +-
 .../coordinator/CompactionCoordinator.java         | 94 +++++++++++++++-------
 .../coordinator/CompactionCoordinatorTest.java     | 64 ++++++++++++++-
 3 files changed, 131 insertions(+), 29 deletions(-)

diff --git a/pom.xml b/pom.xml
index 96833e5eb3..abc036686a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1701,7 +1701,7 @@
         <jdk>[17,)</jdk>
       </activation>
       <properties>
-        <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens 
java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED 
--add-opens java.base/java.net=ALL-UNNAMED --add-opens 
java.management/java.lang.management=ALL-UNNAMED --add-opens 
java.management/sun.management=ALL-UNNAMED --add-opens 
java.base/java.security=ALL-UNNAMED --add-opens 
java.base/java.lang.reflect=ALL-UNNAMED --add-opens 
java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/jav [...]
+        <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens 
java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED 
--add-opens java.base/java.net=ALL-UNNAMED --add-opens 
java.management/java.lang.management=ALL-UNNAMED --add-opens 
java.management/sun.management=ALL-UNNAMED --add-opens 
java.base/java.security=ALL-UNNAMED --add-opens 
java.base/java.lang.reflect=ALL-UNNAMED --add-opens 
java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/jav [...]
       </properties>
     </profile>
   </profiles>
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 27e4881143..232a6066b8 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
 import org.apache.accumulo.core.Constants;
@@ -55,7 +56,9 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@ -90,6 +93,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.Sets;
 
 public class CompactionCoordinator extends AbstractServer
     implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
@@ -100,8 +104,14 @@ public class CompactionCoordinator extends AbstractServer
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
-  /* Map of compactionId to RunningCompactions */
-  protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
+  /*
+   * Map of compactionId to RunningCompactions. This is an informational cache 
of what external
+   * compactions may be running. Its possible it may contain external 
compactions that are not
+   * actually running. It may not contain compactions that are actually 
running. The metadata table
+   * is the most authoritative source of what external compactions are 
currently running, but it
+   * does not have the stats that this map has.
+   */
+  protected static final Map<ExternalCompactionId,RunningCompaction> 
RUNNING_CACHE =
       new ConcurrentHashMap<>();
 
   private static final Cache<ExternalCompactionId,RunningCompaction> COMPLETED 
=
@@ -137,6 +147,7 @@ public class CompactionCoordinator extends AbstractServer
     startGCLogger(schedExecutor);
     printStartupMsg();
     startCompactionCleaner(schedExecutor);
+    startRunningCleaner(schedExecutor);
   }
 
   @Override
@@ -170,6 +181,12 @@ public class CompactionCoordinator extends AbstractServer
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
+  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
+    ScheduledFuture<?> future =
+        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
   protected void printStartupMsg() {
     LOG.info("Version " + Constants.VERSION);
     LOG.info("Instance " + getContext().getInstanceID());
@@ -277,7 +294,7 @@ public class CompactionCoordinator extends AbstractServer
         update.setState(TCompactionState.IN_PROGRESS);
         update.setMessage("Coordinator restarted, compaction found in 
progress");
         rc.addUpdate(System.currentTimeMillis(), update);
-        
RUNNING.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
+        
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
       });
     }
 
@@ -446,7 +463,10 @@ public class CompactionCoordinator extends AbstractServer
           prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
           continue;
         }
-        RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+        // It is possible that by the time this added that the tablet has 
already canceled the
+        // compaction or the compactor that made this request is dead. In 
these cases the compaction
+        // is not actually running.
+        
RUNNING_CACHE.put(ExternalCompactionId.of(job.getExternalCompactionId()),
             new RunningCompaction(job, compactorAddress, queue));
         LOG.debug("Returning external job {} to {}", job.externalCompactionId, 
compactorAddress);
         result = job;
@@ -523,15 +543,7 @@ public class CompactionCoordinator extends AbstractServer
     // It's possible that RUNNING might not have an entry for this ecid in the 
case
     // of a coordinator restart when the Coordinator can't find the TServer 
for the
     // corresponding external compaction.
-    final RunningCompaction rc = RUNNING.get(ecid);
-    if (null != rc) {
-      RUNNING.remove(ecid, rc);
-      COMPLETED.put(ecid, rc);
-    } else {
-      LOG.warn(
-          "Compaction completed called by Compactor for {}, but no running 
compaction for that id.",
-          externalCompactionId);
-    }
+    recordCompletion(ecid);
   }
 
   @Override
@@ -549,17 +561,7 @@ public class CompactionCoordinator extends AbstractServer
 
   void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
     compactionFinalizer.failCompactions(compactions);
-    compactions.forEach((k, v) -> {
-      final RunningCompaction rc = RUNNING.get(k);
-      if (null != rc) {
-        RUNNING.remove(k, rc);
-        COMPLETED.put(k, rc);
-      } else {
-        LOG.warn(
-            "Compaction failed called by Compactor for {}, but no running 
compaction for that id.",
-            k);
-      }
-    });
+    compactions.forEach((k, v) -> recordCompletion(k));
   }
 
   /**
@@ -589,12 +591,49 @@ public class CompactionCoordinator extends AbstractServer
     }
     LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", 
externalCompactionId,
         timestamp, update);
-    final RunningCompaction rc = 
RUNNING.get(ExternalCompactionId.of(externalCompactionId));
+    final RunningCompaction rc = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
     if (null != rc) {
       rc.addUpdate(timestamp, update);
     }
   }
 
+  private void recordCompletion(ExternalCompactionId ecid) {
+    var rc = RUNNING_CACHE.remove(ecid);
+    if (rc != null) {
+      COMPLETED.put(ecid, rc);
+    }
+  }
+
+  protected Set<ExternalCompactionId> readExternalCompactionIds() {
+    return getContext().getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+        .fetch(TabletMetadata.ColumnType.ECOMP).build().stream()
+        .flatMap(tm -> 
tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
+  }
+
+  /**
+   * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
+   * method periodically cleans those up.
+   */
+  protected void cleanUpRunning() {
+
+    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
+    // avoid removing things that are added while reading the metadata.
+    Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
+
+    // grab the ids that are listed as running in the metadata table. It 
important that this is done
+    // after getting the snapshot.
+    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
+
+    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
+
+    // remove ids that are in the running set but not in the metadata table
+    idsToRemove.forEach(ecid -> recordCompletion(ecid));
+
+    if (idsToRemove.size() > 0) {
+      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
+    }
+  }
+
   /**
    * Return information about running compactions
    *
@@ -614,8 +653,9 @@ public class CompactionCoordinator extends AbstractServer
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
+
     final TExternalCompactionList result = new TExternalCompactionList();
-    RUNNING.forEach((ecid, rc) -> {
+    RUNNING_CACHE.forEach((ecid, rc) -> {
       TExternalCompaction trc = new TExternalCompaction();
       trc.setQueueName(rc.getQueueName());
       trc.setCompactor(rc.getCompactorAddress());
@@ -660,7 +700,7 @@ public class CompactionCoordinator extends AbstractServer
   @Override
   public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
       throws TException {
-    var runningCompaction = 
RUNNING.get(ExternalCompactionId.of(externalCompactionId));
+    var runningCompaction = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
     var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
     try {
       NamespaceId nsId = getContext().getNamespaceId(extent.tableId());
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 1715d84af5..f2c57358d1 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.metadata.TServerInstance;
@@ -87,6 +88,8 @@ public class CompactionCoordinatorTest {
     private final ServerAddress client;
     private final TabletClientService.Client tabletServerClient;
 
+    private Set<ExternalCompactionId> metadataCompactionIds = null;
+
     protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet 
tservers,
         ServerAddress client, TabletClientService.Client tabletServerClient, 
ServerContext context,
         AuditedSecurityOperation security) {
@@ -158,6 +161,18 @@ public class CompactionCoordinatorTest {
     public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
         TKeyExtent extent) throws ThriftSecurityException {}
 
+    void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
+      metadataCompactionIds = mci;
+    }
+
+    protected Set<ExternalCompactionId> readExternalCompactionIds() {
+      if (metadataCompactionIds == null) {
+        return RUNNING_CACHE.keySet();
+      } else {
+        return metadataCompactionIds;
+      }
+    }
+
     public Map<String,TreeMap<Short,TreeSet<TServerInstance>>> getQueues() {
       return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES;
     }
@@ -167,13 +182,14 @@ public class CompactionCoordinatorTest {
     }
 
     public Map<ExternalCompactionId,RunningCompaction> getRunning() {
-      return RUNNING;
+      return RUNNING_CACHE;
     }
 
     public void resetInternals() {
       getQueues().clear();
       getIndex().clear();
       getRunning().clear();
+      metadataCompactionIds = null;
     }
 
   }
@@ -586,4 +602,50 @@ public class CompactionCoordinatorTest {
     coordinator.close();
   }
 
+  @Test
+  public void testCleanUpRunning() throws Exception {
+    PowerMock.resetAll();
+    PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+
+    ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+
+    TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
+
+    CompactionFinalizer finalizer = 
PowerMock.createNiceMock(CompactionFinalizer.class);
+    LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
+
+    ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
+    HostAndPort address = HostAndPort.fromString("localhost:10240");
+    expect(client.getAddress()).andReturn(address).anyTimes();
+
+    TabletClientService.Client tsc = 
PowerMock.createNiceMock(TabletClientService.Client.class);
+
+    AuditedSecurityOperation security = 
PowerMock.createNiceMock(AuditedSecurityOperation.class);
+    expect(security.canPerformSystemActions(creds)).andReturn(true);
+
+    PowerMock.replayAll();
+
+    var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, 
context, security);
+    coordinator.resetInternals();
+
+    var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
+    var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
+    var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
+
+    coordinator.getRunning().put(ecid1, new RunningCompaction(new 
TExternalCompaction()));
+    coordinator.getRunning().put(ecid2, new RunningCompaction(new 
TExternalCompaction()));
+    coordinator.getRunning().put(ecid3, new RunningCompaction(new 
TExternalCompaction()));
+
+    coordinator.cleanUpRunning();
+
+    assertEquals(Set.of(ecid1, ecid2, ecid3), 
coordinator.getRunning().keySet());
+
+    coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
+
+    coordinator.cleanUpRunning();
+
+    assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());
+
+  }
 }

Reply via email to