This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 703710e Fix CompactionCoordinatorTest 703710e is described below commit 703710e4816bd8dc4a3c0f9bd49d489a5e8b1009 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Apr 20 13:28:32 2021 +0000 Fix CompactionCoordinatorTest --- .../coordinator/CompactionCoordinator.java | 19 ++--- .../coordinator/CompactionCoordinatorTest.java | 93 +++++++++++----------- .../org/apache/accumulo/compactor/Compactor.java | 5 +- 3 files changed, 59 insertions(+), 58 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index c169701..bf1c0c8 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -93,7 +93,8 @@ public class CompactionCoordinator extends AbstractServer private static final long FIFTEEN_MINUTES = TimeUnit.MILLISECONDS.convert(Duration.of(15, TimeUnit.MINUTES.toChronoUnit())); - QueueSummaries queueSummaries = new QueueSummaries(); + protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); + /* Map of compactionId to RunningCompactions */ protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING = new ConcurrentHashMap<>(); @@ -373,7 +374,7 @@ public class CompactionCoordinator extends AbstractServer QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); synchronized (qp) { TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(qp.getQueue(), k -> 0L); - queueSummaries.update(tsi, summaries); + QUEUE_SUMMARIES.update(tsi, summaries); } }); } finally { @@ -382,7 +383,7 @@ public class CompactionCoordinator extends AbstractServer } catch (TException e) { LOG.warn("Error getting external compaction summaries from tablet server: {}", tsi.getHostAndPort(), e); - queueSummaries.remove(Set.of(tsi)); + QUEUE_SUMMARIES.remove(Set.of(tsi)); } }); @@ -435,7 +436,7 @@ public class CompactionCoordinator extends AbstractServer // run() will iterate over the current and added tservers and add them to the internal // data structures. For tservers that are deleted, we need to remove them from QUEUES // and INDEX - queueSummaries.remove(deleted); + QUEUE_SUMMARIES.remove(deleted); } /** @@ -462,7 +463,7 @@ public class CompactionCoordinator extends AbstractServer TExternalCompactionJob result = null; - PrioTserver prioTserver = queueSummaries.getNextTserver(queueName); + PrioTserver prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName); while (prioTserver != null) { TServerInstance tserver = prioTserver.tserver; @@ -479,8 +480,8 @@ public class CompactionCoordinator extends AbstractServer LOG.debug("No compactions found for queue {} on tserver {}, trying next tserver", queue, tserver.getHostAndPort(), compactorAddress); - queueSummaries.removeSummary(tserver, queueName, prioTserver.prio); - prioTserver = queueSummaries.getNextTserver(queueName); + QUEUE_SUMMARIES.removeSummary(tserver, queueName, prioTserver.prio); + prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName); continue; } RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), @@ -491,8 +492,8 @@ public class CompactionCoordinator extends AbstractServer } catch (TException e) { LOG.warn("Error from tserver {} while trying to reserve compaction, trying next tserver", ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e); - queueSummaries.removeSummary(tserver, queueName, prioTserver.prio); - prioTserver = queueSummaries.getNextTserver(queueName); + QUEUE_SUMMARIES.removeSummary(tserver, queueName, prioTserver.prio); + prioTserver = QUEUE_SUMMARIES.getNextTserver(queueName); } finally { ThriftUtil.returnClient(client); } diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 6436297..5de768f 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -26,12 +26,11 @@ import static org.junit.Assert.assertTrue; import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -151,20 +150,24 @@ public class CompactionCoordinatorTest { public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, TKeyExtent extent) throws TException {} - public Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> getQueues() { - // CBUG todo refactor test - return null; + public Map<String,TreeMap<Long,TreeSet<TServerInstance>>> getQueues() { + return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES; } - public Map<TServerInstance,HashSet<QueueAndPriority>> getIndex() { - // CBUG todo refactor test - return null; + public Map<TServerInstance,Set<QueueAndPriority>> getIndex() { + return CompactionCoordinator.QUEUE_SUMMARIES.INDEX; } public Map<ExternalCompactionId,RunningCompaction> getRunning() { return RUNNING; } + public void resetInternals() { + getQueues().clear(); + getIndex().clear(); + getRunning().clear(); + } + @Override protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) { return new TabletMetadata() { @@ -187,7 +190,8 @@ public class CompactionCoordinatorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient")); - PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions")); + PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions", + "detectDanglingFinalStateMarkers")); AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class); ServerContext ctx = PowerMock.createNiceMock(ServerContext.class); @@ -213,6 +217,7 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); + coordinator.resetInternals(); assertEquals(0, coordinator.getQueues().size()); assertEquals(0, coordinator.getIndex().size()); assertEquals(0, coordinator.getRunning().size()); @@ -222,9 +227,7 @@ public class CompactionCoordinatorTest { assertEquals(0, coordinator.getRunning().size()); PowerMock.verifyAll(); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); coordinator.close(); } @@ -233,7 +236,8 @@ public class CompactionCoordinatorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient")); - PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions")); + PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions", + "detectDanglingFinalStateMarkers")); AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class); ServerContext ctx = PowerMock.createNiceMock(ServerContext.class); @@ -257,8 +261,8 @@ public class CompactionCoordinatorTest { TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class); EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ"); - EasyMock.expect(queueSummary.getPriority()).andReturn(1L); + EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); + EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes(); AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class); @@ -266,13 +270,14 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); + coordinator.resetInternals(); assertEquals(0, coordinator.getQueues().size()); assertEquals(0, coordinator.getIndex().size()); assertEquals(0, coordinator.getRunning().size()); coordinator.run(); assertEquals(1, coordinator.getQueues().size()); QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L); - Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); + Map<Long,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); assertNotNull(m); assertEquals(1, m.size()); assertTrue(m.containsKey(1L)); @@ -289,9 +294,7 @@ public class CompactionCoordinatorTest { assertEquals(0, coordinator.getRunning().size()); PowerMock.verifyAll(); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); coordinator.close(); } @@ -300,7 +303,8 @@ public class CompactionCoordinatorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient")); - PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions")); + PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions", + "detectDanglingFinalStateMarkers")); AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class); ServerContext ctx = PowerMock.createNiceMock(ServerContext.class); @@ -319,9 +323,6 @@ public class CompactionCoordinatorTest { PowerMock.mockStatic(ExternalCompactionUtil.class); Map<HostAndPort,TExternalCompactionJob> runningCompactions = new HashMap<>(); - // ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - // TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class); - // runningCompactions.put(tserverAddress, job); EasyMock.expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(ctx)) .andReturn(runningCompactions); @@ -335,8 +336,8 @@ public class CompactionCoordinatorTest { TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class); EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ"); - EasyMock.expect(queueSummary.getPriority()).andReturn(1L); + EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); + EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes(); AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class); @@ -344,13 +345,14 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); + coordinator.resetInternals(); assertEquals(0, coordinator.getQueues().size()); assertEquals(0, coordinator.getIndex().size()); assertEquals(0, coordinator.getRunning().size()); coordinator.run(); assertEquals(1, coordinator.getQueues().size()); QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L); - Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); + Map<Long,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); assertNotNull(m); assertEquals(1, m.size()); assertTrue(m.containsKey(1L)); @@ -367,9 +369,7 @@ public class CompactionCoordinatorTest { assertEquals(0, coordinator.getRunning().size()); PowerMock.verifyAll(); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); coordinator.close(); } @@ -379,7 +379,8 @@ public class CompactionCoordinatorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient")); - PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions")); + PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions", + "detectDanglingFinalStateMarkers")); AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class); ServerContext ctx = PowerMock.createNiceMock(ServerContext.class); @@ -420,8 +421,8 @@ public class CompactionCoordinatorTest { TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class); EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ"); - EasyMock.expect(queueSummary.getPriority()).andReturn(1L); + EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); + EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes(); EasyMock.expect(tsc.isRunningExternalCompaction(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).andReturn(true); @@ -431,13 +432,14 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); + coordinator.resetInternals(); assertEquals(0, coordinator.getQueues().size()); assertEquals(0, coordinator.getIndex().size()); assertEquals(0, coordinator.getRunning().size()); coordinator.run(); assertEquals(1, coordinator.getQueues().size()); QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L); - Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); + Map<Long,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); assertNotNull(m); assertEquals(1, m.size()); assertTrue(m.containsKey(1L)); @@ -454,9 +456,7 @@ public class CompactionCoordinatorTest { assertEquals(1, coordinator.getRunning().size()); PowerMock.verifyAll(); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); coordinator.close(); } @@ -465,7 +465,8 @@ public class CompactionCoordinatorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient")); - PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions")); + PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions", + "detectDanglingFinalStateMarkers")); AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class); ServerContext ctx = PowerMock.createNiceMock(ServerContext.class); @@ -491,8 +492,8 @@ public class CompactionCoordinatorTest { TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class); EasyMock.expect(tsc.getCompactionQueueInfo(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(Collections.singletonList(queueSummary)).anyTimes(); - EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ"); - EasyMock.expect(queueSummary.getPriority()).andReturn(1L); + EasyMock.expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes(); + EasyMock.expect(queueSummary.getPriority()).andReturn(1L).anyTimes(); ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class); @@ -510,6 +511,7 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); + coordinator.resetInternals(); assertEquals(0, coordinator.getQueues().size()); assertEquals(0, coordinator.getIndex().size()); assertEquals(0, coordinator.getRunning().size()); @@ -519,7 +521,7 @@ public class CompactionCoordinatorTest { assertEquals(1, coordinator.getQueues().size()); QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L); - Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); + Map<Long,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern()); assertNotNull(m); assertEquals(1, m.size()); assertTrue(m.containsKey(1L)); @@ -541,7 +543,7 @@ public class CompactionCoordinatorTest { assertEquals(eci.toString(), createdJob.getExternalCompactionId()); assertEquals(1, coordinator.getQueues().size()); - assertEquals(0, coordinator.getIndex().size()); + assertEquals(1, coordinator.getIndex().size()); assertEquals(1, coordinator.getRunning().size()); Entry<ExternalCompactionId,RunningCompaction> entry = coordinator.getRunning().entrySet().iterator().next(); @@ -550,9 +552,7 @@ public class CompactionCoordinatorTest { assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId()); PowerMock.verifyAll(); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); coordinator.close(); } @@ -582,14 +582,13 @@ public class CompactionCoordinatorTest { TestCoordinator coordinator = new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security); - coordinator.getQueues().clear(); - coordinator.getIndex().clear(); - coordinator.getRunning().clear(); + coordinator.resetInternals(); TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ", "localhost:10240", UUID.randomUUID().toString()); assertNull(job.getExternalCompactionId()); PowerMock.verifyAll(); + coordinator.resetInternals(); coordinator.close(); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 8246fcc..10d1cfa 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -115,8 +115,9 @@ public class Compactor extends AbstractServer private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private static final long TEN_MEGABYTES = 10485760; - private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory(); - + private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = + new CompactionCoordinator.Client.Factory(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf;