This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 703710e  Fix CompactionCoordinatorTest
703710e is described below

commit 703710e4816bd8dc4a3c0f9bd49d489a5e8b1009
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Apr 20 13:28:32 2021 +0000

    Fix CompactionCoordinatorTest
---
 .../coordinator/CompactionCoordinator.java         | 19 ++---
 .../coordinator/CompactionCoordinatorTest.java     | 93 +++++++++++-----------
 .../org/apache/accumulo/compactor/Compactor.java   |  5 +-
 3 files changed, 59 insertions(+), 58 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index c169701..bf1c0c8 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -93,7 +93,8 @@ public class CompactionCoordinator extends AbstractServer
   private static final long FIFTEEN_MINUTES =
       TimeUnit.MILLISECONDS.convert(Duration.of(15, 
TimeUnit.MINUTES.toChronoUnit()));
 
-  QueueSummaries queueSummaries = new QueueSummaries();
+  protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
+
   /* Map of compactionId to RunningCompactions */
   protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
       new ConcurrentHashMap<>();
@@ -373,7 +374,7 @@ public class CompactionCoordinator extends AbstractServer
                   QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
               synchronized (qp) {
                 TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(qp.getQueue(), k 
-> 0L);
-                queueSummaries.update(tsi, summaries);
+                QUEUE_SUMMARIES.update(tsi, summaries);
               }
             });
           } finally {
@@ -382,7 +383,7 @@ public class CompactionCoordinator extends AbstractServer
         } catch (TException e) {
           LOG.warn("Error getting external compaction summaries from tablet 
server: {}",
               tsi.getHostAndPort(), e);
-          queueSummaries.remove(Set.of(tsi));
+          QUEUE_SUMMARIES.remove(Set.of(tsi));
         }
       });
 
@@ -435,7 +436,7 @@ public class CompactionCoordinator extends AbstractServer
     // run() will iterate over the current and added tservers and add them to 
the internal
     // data structures. For tservers that are deleted, we need to remove them 
from QUEUES
     // and INDEX
-    queueSummaries.remove(deleted);
+    QUEUE_SUMMARIES.remove(deleted);
   }
 
   /**
@@ -462,7 +463,7 @@ public class CompactionCoordinator extends AbstractServer
 
     TExternalCompactionJob result = null;
 
-    PrioTserver prioTserver = queueSummaries.getNextTserver(queueName);
+    PrioTserver prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName);
 
     while (prioTserver != null) {
       TServerInstance tserver = prioTserver.tserver;
@@ -479,8 +480,8 @@ public class CompactionCoordinator extends AbstractServer
           LOG.debug("No compactions found for queue {} on tserver {}, trying 
next tserver", queue,
               tserver.getHostAndPort(), compactorAddress);
 
-          queueSummaries.removeSummary(tserver, queueName, prioTserver.prio);
-          prioTserver = queueSummaries.getNextTserver(queueName);
+          QUEUE_SUMMARIES.removeSummary(tserver, queueName, prioTserver.prio);
+          prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName);
           continue;
         }
         RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
@@ -491,8 +492,8 @@ public class CompactionCoordinator extends AbstractServer
       } catch (TException e) {
         LOG.warn("Error from tserver {} while trying to reserve compaction, 
trying next tserver",
             
ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
-        queueSummaries.removeSummary(tserver, queueName, prioTserver.prio);
-        prioTserver = queueSummaries.getNextTserver(queueName);
+        QUEUE_SUMMARIES.removeSummary(tserver, queueName, prioTserver.prio);
+        prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName);
       } finally {
         ThriftUtil.returnClient(client);
       }
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 6436297..5de768f 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -26,12 +26,11 @@ import static org.junit.Assert.assertTrue;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -151,20 +150,24 @@ public class CompactionCoordinatorTest {
     public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
         TKeyExtent extent) throws TException {}
 
-    public Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> 
getQueues() {
-      // CBUG todo refactor test
-      return null;
+    public Map<String,TreeMap<Long,TreeSet<TServerInstance>>> getQueues() {
+      return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES;
     }
 
-    public Map<TServerInstance,HashSet<QueueAndPriority>> getIndex() {
-      // CBUG todo refactor test
-      return null;
+    public Map<TServerInstance,Set<QueueAndPriority>> getIndex() {
+      return CompactionCoordinator.QUEUE_SUMMARIES.INDEX;
     }
 
     public Map<ExternalCompactionId,RunningCompaction> getRunning() {
       return RUNNING;
     }
 
+    public void resetInternals() {
+      getQueues().clear();
+      getIndex().clear();
+      getRunning().clear();
+    }
+
     @Override
     protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) {
       return new TabletMetadata() {
@@ -187,7 +190,8 @@ public class CompactionCoordinatorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
     PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
-    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions"));
+    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions",
+        "detectDanglingFinalStateMarkers"));
 
     AccumuloConfiguration conf = 
PowerMock.createNiceMock(AccumuloConfiguration.class);
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
@@ -213,6 +217,7 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
+    coordinator.resetInternals();
     assertEquals(0, coordinator.getQueues().size());
     assertEquals(0, coordinator.getIndex().size());
     assertEquals(0, coordinator.getRunning().size());
@@ -222,9 +227,7 @@ public class CompactionCoordinatorTest {
     assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     coordinator.close();
   }
 
@@ -233,7 +236,8 @@ public class CompactionCoordinatorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
     PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
-    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions"));
+    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions",
+        "detectDanglingFinalStateMarkers"));
 
     AccumuloConfiguration conf = 
PowerMock.createNiceMock(AccumuloConfiguration.class);
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
@@ -257,8 +261,8 @@ public class CompactionCoordinatorTest {
     TCompactionQueueSummary queueSummary = 
PowerMock.createNiceMock(TCompactionQueueSummary.class);
     EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), 
EasyMock.anyObject()))
         .andReturn(Collections.singletonList(queueSummary)).anyTimes();
-    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ");
-    EasyMock.expect(queueSummary.getPriority()).andReturn(1L);
+    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
+    EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes();
 
     AuditedSecurityOperation security = 
PowerMock.createNiceMock(AuditedSecurityOperation.class);
 
@@ -266,13 +270,14 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
+    coordinator.resetInternals();
     assertEquals(0, coordinator.getQueues().size());
     assertEquals(0, coordinator.getIndex().size());
     assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
     assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
-    Map<Long,LinkedHashSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
+    Map<Long,TreeSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
     assertNotNull(m);
     assertEquals(1, m.size());
     assertTrue(m.containsKey(1L));
@@ -289,9 +294,7 @@ public class CompactionCoordinatorTest {
     assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     coordinator.close();
   }
 
@@ -300,7 +303,8 @@ public class CompactionCoordinatorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
     PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
-    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions"));
+    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions",
+        "detectDanglingFinalStateMarkers"));
 
     AccumuloConfiguration conf = 
PowerMock.createNiceMock(AccumuloConfiguration.class);
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
@@ -319,9 +323,6 @@ public class CompactionCoordinatorTest {
 
     PowerMock.mockStatic(ExternalCompactionUtil.class);
     Map<HostAndPort,TExternalCompactionJob> runningCompactions = new 
HashMap<>();
-    // ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
-    // TExternalCompactionJob job = 
PowerMock.createNiceMock(TExternalCompactionJob.class);
-    // runningCompactions.put(tserverAddress, job);
     
EasyMock.expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(ctx))
         .andReturn(runningCompactions);
 
@@ -335,8 +336,8 @@ public class CompactionCoordinatorTest {
     TCompactionQueueSummary queueSummary = 
PowerMock.createNiceMock(TCompactionQueueSummary.class);
     EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), 
EasyMock.anyObject()))
         .andReturn(Collections.singletonList(queueSummary)).anyTimes();
-    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ");
-    EasyMock.expect(queueSummary.getPriority()).andReturn(1L);
+    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
+    EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes();
 
     AuditedSecurityOperation security = 
PowerMock.createNiceMock(AuditedSecurityOperation.class);
 
@@ -344,13 +345,14 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
+    coordinator.resetInternals();
     assertEquals(0, coordinator.getQueues().size());
     assertEquals(0, coordinator.getIndex().size());
     assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
     assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
-    Map<Long,LinkedHashSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
+    Map<Long,TreeSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
     assertNotNull(m);
     assertEquals(1, m.size());
     assertTrue(m.containsKey(1L));
@@ -367,9 +369,7 @@ public class CompactionCoordinatorTest {
     assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     coordinator.close();
   }
 
@@ -379,7 +379,8 @@ public class CompactionCoordinatorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
     PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
-    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions"));
+    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions",
+        "detectDanglingFinalStateMarkers"));
 
     AccumuloConfiguration conf = 
PowerMock.createNiceMock(AccumuloConfiguration.class);
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
@@ -420,8 +421,8 @@ public class CompactionCoordinatorTest {
     TCompactionQueueSummary queueSummary = 
PowerMock.createNiceMock(TCompactionQueueSummary.class);
     EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), 
EasyMock.anyObject()))
         .andReturn(Collections.singletonList(queueSummary)).anyTimes();
-    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ");
-    EasyMock.expect(queueSummary.getPriority()).andReturn(1L);
+    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
+    EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes();
     EasyMock.expect(tsc.isRunningExternalCompaction(EasyMock.anyObject(), 
EasyMock.anyObject(),
         EasyMock.anyObject(), EasyMock.anyObject())).andReturn(true);
 
@@ -431,13 +432,14 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
+    coordinator.resetInternals();
     assertEquals(0, coordinator.getQueues().size());
     assertEquals(0, coordinator.getIndex().size());
     assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
     assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
-    Map<Long,LinkedHashSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
+    Map<Long,TreeSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
     assertNotNull(m);
     assertEquals(1, m.size());
     assertTrue(m.containsKey(1L));
@@ -454,9 +456,7 @@ public class CompactionCoordinatorTest {
     assertEquals(1, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     coordinator.close();
   }
 
@@ -465,7 +465,8 @@ public class CompactionCoordinatorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
     PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
-    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions"));
+    PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, 
"detectDeadCompactions",
+        "detectDanglingFinalStateMarkers"));
 
     AccumuloConfiguration conf = 
PowerMock.createNiceMock(AccumuloConfiguration.class);
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
@@ -491,8 +492,8 @@ public class CompactionCoordinatorTest {
     TCompactionQueueSummary queueSummary = 
PowerMock.createNiceMock(TCompactionQueueSummary.class);
     EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), 
EasyMock.anyObject()))
         .andReturn(Collections.singletonList(queueSummary)).anyTimes();
-    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ");
-    EasyMock.expect(queueSummary.getPriority()).andReturn(1L);
+    EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
+    EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes();
 
     ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
     TExternalCompactionJob job = 
PowerMock.createNiceMock(TExternalCompactionJob.class);
@@ -510,6 +511,7 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
+    coordinator.resetInternals();
     assertEquals(0, coordinator.getQueues().size());
     assertEquals(0, coordinator.getIndex().size());
     assertEquals(0, coordinator.getRunning().size());
@@ -519,7 +521,7 @@ public class CompactionCoordinatorTest {
 
     assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
-    Map<Long,LinkedHashSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
+    Map<Long,TreeSet<TServerInstance>> m = 
coordinator.getQueues().get("R2DQ".intern());
     assertNotNull(m);
     assertEquals(1, m.size());
     assertTrue(m.containsKey(1L));
@@ -541,7 +543,7 @@ public class CompactionCoordinatorTest {
     assertEquals(eci.toString(), createdJob.getExternalCompactionId());
 
     assertEquals(1, coordinator.getQueues().size());
-    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(1, coordinator.getIndex().size());
     assertEquals(1, coordinator.getRunning().size());
     Entry<ExternalCompactionId,RunningCompaction> entry =
         coordinator.getRunning().entrySet().iterator().next();
@@ -550,9 +552,7 @@ public class CompactionCoordinatorTest {
     assertEquals(eci.toString(), 
entry.getValue().getJob().getExternalCompactionId());
 
     PowerMock.verifyAll();
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     coordinator.close();
 
   }
@@ -582,14 +582,13 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, 
security);
-    coordinator.getQueues().clear();
-    coordinator.getIndex().clear();
-    coordinator.getRunning().clear();
+    coordinator.resetInternals();
     TExternalCompactionJob job = 
coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ",
         "localhost:10240", UUID.randomUUID().toString());
     assertNull(job.getExternalCompactionId());
 
     PowerMock.verifyAll();
+    coordinator.resetInternals();
     coordinator.close();
   }
 
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 8246fcc..10d1cfa 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -115,8 +115,9 @@ public class Compactor extends AbstractServer
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
   private static final long TEN_MEGABYTES = 10485760;
-  private static final CompactionCoordinator.Client.Factory 
COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory();
-  
+  private static final CompactionCoordinator.Client.Factory 
COORDINATOR_CLIENT_FACTORY =
+      new CompactionCoordinator.Client.Factory();
+
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;

Reply via email to