This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 202198a588 Updates compaction to use TabletLogger (#4333)
202198a588 is described below
commit 202198a588aae1096611ff9b5513075054e2a876
Author: Keith Turner <[email protected]>
AuthorDate: Mon Mar 4 10:55:50 2024 -0500
Updates compaction to use TabletLogger (#4333)
---
.../apache/accumulo/core/logging/TabletLogger.java | 49 +++++++++++-----------
.../coordinator/CompactionCoordinator.java | 9 ++--
.../coordinator/commit/CommitCompaction.java | 4 ++
.../manager/tableOps/compact/CompactionDriver.java | 22 ++++++++--
test/src/main/resources/log4j2-test.properties | 3 ++
5 files changed, 55 insertions(+), 32 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index 349d29b19f..e76c62a6c9 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@ -27,14 +27,16 @@ import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -127,40 +129,36 @@ public class TabletLogger {
cf -> CompactableFileImpl.toStoredTabletFile(cf).toMinimalString());
}
- public static void selected(KeyExtent extent, CompactionKind kind,
+ public static void selected(FateId fateId, KeyExtent extent,
Collection<StoredTabletFile> inputs) {
- fileLog.trace("{} changed compaction selection set for {} new set {}",
extent, kind,
+ fileLog.trace("Selected files {} {} {}", extent, fateId,
Collections2.transform(inputs, StoredTabletFile::toMinimalString));
}
- public static void compacting(KeyExtent extent, CompactionJob job,
CompactionConfig config) {
+ public static void compacting(TabletMetadata tabletMetadata,
ExternalCompactionId cid,
+ String compactorAddress, CompactionJob job) {
if (fileLog.isDebugEnabled()) {
- if (config == null) {
- fileLog.debug("Compacting {} on {} for {} from {} size {}", extent,
job.getGroup(),
- job.getKind(), asMinimalString(job.getFiles()),
getSize(job.getFiles()));
+ if (job.getKind() == CompactionKind.USER) {
+ var fateId = tabletMetadata.getSelectedFiles().getFateId();
+ fileLog.debug(
+ "Compacting {} driver:{} id:{} group:{} compactor:{} priority:{}
size:{} kind:{} files:{}",
+ tabletMetadata.getExtent(), fateId, cid, job.getGroup(),
compactorAddress,
+ job.getPriority(), getSize(job.getFiles()), job.getKind(),
+ asMinimalString(job.getFiles()));
} else {
- fileLog.debug("Compacting {} on {} for {} from {} size {} config {}",
extent,
- job.getGroup(), job.getKind(), asMinimalString(job.getFiles()),
getSize(job.getFiles()),
- config);
+ fileLog.debug(
+ "Compacting {} id:{} group:{} compactor:{} priority:{} size:{}
kind:{} files:{}",
+ tabletMetadata.getExtent(), cid, job.getGroup(), compactorAddress,
job.getPriority(),
+ getSize(job.getFiles()), job.getKind(),
asMinimalString(job.getFiles()));
}
}
}
- public static void compacted(KeyExtent extent, CompactionJob job,
StoredTabletFile output) {
- fileLog.debug("Compacted {} for {} created {} from {}", extent,
job.getKind(), output,
- asMinimalString(job.getFiles()));
- }
-
- public static void compactionFailed(KeyExtent extent, CompactionJob job,
- CompactionConfig config) {
- fileLog.debug("Failed to compact: extent: {}, input files: {}, iterators:
{}", extent,
- asMinimalString(job.getFiles()), config.getIterators());
- }
-
- public static void externalCompactionFailed(KeyExtent extent,
ExternalCompactionId id,
- CompactionJob job, CompactionConfig config) {
- fileLog.debug("Failed to compact: id: {}, extent: {}, input files: {},
iterators: {}", id,
- extent, asMinimalString(job.getFiles()), config.getIterators());
+ public static void compacted(KeyExtent extent, ExternalCompactionId ecid,
CompactionKind kind,
+ Collection<StoredTabletFile> inputs, Optional<ReferencedTabletFile>
output) {
+ var transformed = Collections2.transform(inputs,
StoredTabletFile::toMinimalString);
+ fileLog.debug("{} compacted {} for {} created {} from {}", ecid, extent,
kind,
+ output.map(f -> f + "").orElse("no output"), transformed);
}
public static void flushed(KeyExtent extent, Optional<StoredTabletFile>
newDatafile) {
@@ -198,4 +196,5 @@ public class TabletLogger {
public static void walRefsChanged(KeyExtent extent, Collection<String>
refsSupplier) {
walsLog.trace("{} has unflushed data in wals: {} ", extent, refsSupplier);
}
+
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index a07ff50bc4..d36ec662d5 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -74,6 +74,7 @@ import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
+import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
@@ -347,10 +348,10 @@ public class CompactionCoordinator
// Only reserve user compactions when the config is present. When
compactions are canceled the
// config is deleted.
+ var cid = ExternalCompactionId.from(externalCompactionId);
if (kind == CompactionKind.SYSTEM
|| (kind == CompactionKind.USER && compactionConfig.isPresent())) {
- ecm = reserveCompaction(metaJob, compactorAddress,
- ExternalCompactionId.from(externalCompactionId));
+ ecm = reserveCompaction(metaJob, compactorAddress, cid);
}
if (ecm != null) {
@@ -359,8 +360,8 @@ public class CompactionCoordinator
// is dead. In this cases the compaction is not actually running.
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
new RunningCompaction(result, compactorAddress, groupName));
- LOG.debug("Returning external job {} to {} with {} files",
result.externalCompactionId,
- compactorAddress, ecm.getJobFiles().size());
+ TabletLogger.compacting(metaJob.getTabletMetadata(), cid,
compactorAddress,
+ metaJob.getJob());
break;
} else {
LOG.debug(
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 2ff3a30386..cc0432d4a0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.metadata.AbstractTabletFile;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -136,6 +137,9 @@ public class CommitCompaction extends ManagerRepo {
var result = tabletsMutator.process().get(getExtent());
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+ // Compaction was successfully committed to the tablet so log it
+ TabletLogger.compacted(getExtent(), ecid, commitData.kind,
commitData.getJobFiles(),
+ newDatafile);
break;
} else {
// compaction failed to commit, maybe something changed on the
tablet so lets reread the
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index b3f373cb16..f8ad23172b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -28,6 +28,7 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import java.time.Duration;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -44,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.metadata.AbstractTabletFile;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -137,12 +139,24 @@ class CompactionDriver extends ManagerRepo {
var ample = manager.getContext().getAmple();
- // ELASTICITY_TODO use existing compaction logging
+ // This map tracks tablets that had a conditional mutation submitted to
select files. If the
+ // conditional mutation is successful then want to log a message. Use a
concurrent map as the
+ // result consumer may run in another thread.
+ ConcurrentHashMap<KeyExtent,Set<StoredTabletFile>> selectionsSubmitted =
+ new ConcurrentHashMap<>();
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
log.debug("{} update for {} was rejected ", fateId,
result.getExtent());
}
+
+ // always remove extents from the map even if not successful in order to
avoid placing too
+ // many in memory
+ var selected = selectionsSubmitted.remove(result.getExtent());
+ if (selected != null && result.getStatus() == Status.ACCEPTED) {
+ // successfully selected files so log this
+ TabletLogger.selected(fateId, result.getExtent(), selected);
+ }
};
long t1 = System.currentTimeMillis();
@@ -229,6 +243,8 @@ class CompactionDriver extends ManagerRepo {
mutator.putSelectedFiles(selectedFiles);
+ selectionsSubmitted.put(tablet.getExtent(), filesToCompact);
+
mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles()
!= null
&& tabletMetadata.getSelectedFiles().getMetadataValue()
.equals(selectedFiles.getMetadataValue()));
@@ -261,7 +277,7 @@ class CompactionDriver extends ManagerRepo {
// If there are compactions preventing selection of files, then add
// selecting marker that prevents new compactions from starting
if (!tablet.getUserCompactionsRequested().contains(fateId)) {
- log.debug(
+ log.trace(
"Another compaction exists for {}, Marking {} as needing a
user requested compaction",
tablet.getExtent(), fateId);
var mutator =
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
@@ -271,7 +287,7 @@ class CompactionDriver extends ManagerRepo {
userCompactionRequested++;
} else {
// Marker was already added and we are waiting
- log.debug("Waiting on {} for previously marked user requested
compaction {} to run",
+ log.trace("Waiting on {} for previously marked user requested
compaction {} to run",
tablet.getExtent(), fateId);
userCompactionWaiting++;
}
diff --git a/test/src/main/resources/log4j2-test.properties
b/test/src/main/resources/log4j2-test.properties
index 70276bda59..7a92825296 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -142,6 +142,9 @@ logger.38.level = debug
logger.39.name = org.apache.accumulo.manager.Manager
logger.39.level = trace
+logger.40.name = org.apache.accumulo.tablet
+logger.40.level = trace
+
property.metricsFilename = ./target/test-metrics
# appender.metrics.type = Console