This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f3e75f0e46 Several small improvements to Compactor impl (#4342)
f3e75f0e46 is described below
commit f3e75f0e46a320d10f222546c33ada896c371e7c
Author: Christopher Tubbs <[email protected]>
AuthorDate: Fri Mar 15 16:53:53 2024 -0400
Several small improvements to Compactor impl (#4342)
* Start threadpools when service is run, not when it is constructed (and
importantly, after metrics have been fully initialized); this ensures
that ServerContext, which is created during construction, is available
before threadpools and metrics are started
* Use the security object from the ServerContext
* Remove unneeded second constructor with config overrides (can provide
overridden config in test subclass instead)
* Remove redundant printStartupMsg (already printed the same details in
AbstractServer base class)
* Make sure SuccessfulCompactor's close method is called in
try-with-resources to remove unclosed resource warning in
CompactorTest
---
.../org/apache/accumulo/compactor/Compactor.java | 44 +++++-----------------
.../apache/accumulo/compactor/CompactorTest.java | 27 ++++---------
2 files changed, 18 insertions(+), 53 deletions(-)
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1434d2a1f1..5e09e48fdf 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -105,7 +105,6 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
-import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
@@ -142,13 +141,10 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
private final GarbageCollectionLogger gcLogger = new
GarbageCollectionLogger();
private final UUID compactorId = UUID.randomUUID();
- private final AccumuloConfiguration aconf;
private final String queueName;
protected final AtomicReference<ExternalCompactionId> currentCompactionId =
new AtomicReference<>();
- private final CompactionWatcher watcher;
- private SecurityOperation security;
private ServiceLock compactorLock;
private ServerAddress compactorAddress = null;
@@ -158,25 +154,8 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
protected Compactor(CompactorServerOpts opts, String[] args) {
- this(opts, args, null);
- }
-
- protected Compactor(CompactorServerOpts opts, String[] args,
AccumuloConfiguration conf) {
super("compactor", opts, args);
queueName = opts.getQueueName();
- aconf = conf == null ? super.getConfiguration() : conf;
- setupSecurity();
- watcher = new CompactionWatcher(aconf);
- var schedExecutor =
-
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
- startGCLogger(schedExecutor);
- startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
- printStartupMsg();
- }
-
- @Override
- public AccumuloConfiguration getConfiguration() {
- return aconf;
}
@Override
@@ -186,10 +165,6 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
CompactionWatcher.setTimer(timer);
}
- protected void setupSecurity() {
- security = getContext().getSecurityOperation();
- }
-
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(() ->
gcLogger.logGCInfo(getConfiguration()), 0,
@@ -245,11 +220,6 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
}
- protected void printStartupMsg() {
- LOG.info("Version " + Constants.VERSION);
- LOG.info("Instance " + getContext().getInstanceID());
- }
-
/**
* Set up nodes and locks in ZooKeeper for this Compactor
*
@@ -359,7 +329,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
TableId tableId = JOB_HOLDER.getTableId();
try {
NamespaceId nsId = getContext().getNamespaceId(tableId);
- if (!security.canCompact(credentials, tableId, nsId)) {
+ if (!getContext().getSecurityOperation().canCompact(credentials,
tableId, nsId)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
@@ -633,6 +603,12 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
MetricsUtil.initializeProducers(this);
+ var watcher = new CompactionWatcher(getConfiguration());
+ var schedExecutor = ThreadPools.getServerThreadPools()
+ .createGeneralScheduledExecutorService(getConfiguration());
+ startGCLogger(schedExecutor);
+ startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
+
LOG.info("Compactor started, waiting for work");
try {
@@ -831,7 +807,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
@Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials
credentials)
throws ThriftSecurityException, TException {
- if (!security.canPerformSystemActions(credentials)) {
+ if
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
@@ -858,7 +834,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials
credentials)
throws ThriftSecurityException, TException {
// do not expect users to call this directly, expect other tservers to
call this method
- if (!security.canPerformSystemActions(credentials)) {
+ if
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
@@ -883,7 +859,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
public String getRunningCompactionId(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException, TException {
// do not expect users to call this directly, expect other tservers to
call this method
- if (!security.canPerformSystemActions(credentials)) {
+ if
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 2b8fa20b7c..f7e979820e 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -36,7 +36,6 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -176,7 +175,7 @@ public class CompactorTest {
SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address,
TExternalCompactionJob job,
ServerContext context, ExternalCompactionId eci) {
- super(new CompactorServerOpts(), new String[] {"-q", "testQ"},
context.getConfiguration());
+ super(new CompactorServerOpts(), new String[] {"-q", "testQ"});
this.uuid = uuid;
this.address = address;
this.job = job;
@@ -184,20 +183,9 @@ public class CompactorTest {
this.eci = eci;
}
- @Override
- public AccumuloConfiguration getConfiguration() {
- return context.getConfiguration();
- }
-
- @Override
- protected void setupSecurity() {}
-
@Override
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {}
- @Override
- protected void printStartupMsg() {}
-
@Override
public ServerContext getContext() {
return this.context;
@@ -469,13 +457,14 @@ public class CompactorTest {
PowerMock.replayAll();
- SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context,
null);
- PowerMock.verifyAll();
+ try (var c = new SuccessfulCompactor(null, null, null, context, null)) {
+ Long maxWait = c.getWaitTimeBetweenCompactionChecks();
+ // compaction jitter means maxWait is between 0.9 and 1.1 of the desired
value.
+ assertTrue(maxWait >= 720L);
+ assertTrue(maxWait <= 968L);
+ }
- Long maxWait = c.getWaitTimeBetweenCompactionChecks();
- // compaction jitter means maxWait is between 0.9 and 1.1 of the desired
value.
- assertTrue(maxWait >= 720L);
- assertTrue(maxWait <= 968L);
+ PowerMock.verifyAll();
}
}