This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 212e3cd27c updates bulk import and compaction logging (#4440)
212e3cd27c is described below
commit 212e3cd27cf9701e750ca8c3a2566a74d88867f3
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 10 10:56:01 2024 -0400
updates bulk import and compaction logging (#4440)
---
.../coordinator/commit/CommitCompaction.java | 20 ++++----
.../manager/tableOps/bulkVer2/LoadFiles.java | 54 +++++++++++++---------
.../manager/tableOps/compact/CompactionDriver.java | 2 -
3 files changed, 41 insertions(+), 35 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index cc0432d4a0..7add060466 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -130,10 +130,11 @@ public class CommitCompaction extends ManagerRepo {
tabletMutator
.submit(tabletMetadata ->
!tabletMetadata.getExternalCompactions().containsKey(ecid));
- // TODO expensive logging
- LOG.debug("Compaction completed {} added {} removed {}",
tablet.getExtent(), newDatafile,
- ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
- .collect(Collectors.toList()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction completed {} added {} removed {}",
tablet.getExtent(), newDatafile,
+ ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
+ .collect(Collectors.toList()));
+ }
var result = tabletsMutator.process().get(getExtent());
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
@@ -159,7 +160,7 @@ public class CommitCompaction extends ManagerRepo {
private void updateTabletForCompaction(TCompactionStats stats,
ExternalCompactionId ecid,
TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile,
CompactionMetadata ecm,
Ample.ConditionalTabletMutator tabletMutator) {
- // ELASTICITY_TODO improve logging adapt to use existing tablet files
logging
+
if (ecm.getKind() == CompactionKind.USER) {
if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
// all files selected for the user compactions are finished, so the
tablet is finish and
@@ -171,8 +172,7 @@ public class CommitCompaction extends ManagerRepo {
"Tablet %s unexpected has selected files and compacted columns for
%s",
tablet.getExtent(), fateId);
- // TODO set to trace
- LOG.debug("All selected files compacted for {} setting compacted for
{}",
+ LOG.trace("All selected files compacted for {} setting compacted for
{}",
tablet.getExtent(), tablet.getSelectedFiles().getFateId());
tabletMutator.deleteSelectedFiles();
@@ -187,14 +187,12 @@ public class CommitCompaction extends ManagerRepo {
newSelectedFileSet.removeAll(ecm.getJobFiles());
if (newDatafile.isPresent()) {
- // TODO set to trace
- LOG.debug(
+ LOG.trace(
"Not all selected files for {} are done, adding new selected
file {} from compaction",
tablet.getExtent(),
newDatafile.orElseThrow().getPath().getName());
newSelectedFileSet.add(newDatafile.orElseThrow().insert());
} else {
- // TODO set to trace
- LOG.debug(
+ LOG.trace(
"Not all selected files for {} are done, compaction produced no
output so not adding to selected set.",
tablet.getExtent());
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index d336358700..1689ae4359 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.accumulo.core.clientImpl.bulk.Bulk;
@@ -44,9 +45,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -67,6 +70,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
/**
@@ -112,6 +116,7 @@ class LoadFiles extends ManagerRepo {
protected FateId fateId;
protected boolean setTime;
Ample.ConditionalTabletsMutator conditionalMutator;
+ private Map<KeyExtent,List<TabletFile>> loadingFiles;
private long skipped = 0;
@@ -122,6 +127,7 @@ class LoadFiles extends ManagerRepo {
this.setTime = setTime;
conditionalMutator =
manager.getContext().getAmple().conditionallyMutateTablets();
this.skipped = 0;
+ this.loadingFiles = new HashMap<>();
}
void load(List<TabletMetadata> tablets, Files files) {
@@ -213,6 +219,11 @@ class LoadFiles extends ManagerRepo {
tabletMutator.putTime(tabletTime.getMetadataTime());
}
+ // Hang on to for logging purposes in the case where the update is a
+ // success.
+ Preconditions.checkState(
+ loadingFiles.put(tablet.getExtent(),
List.copyOf(filesToLoad.keySet())) == null);
+
tabletMutator.submit(tm -> false);
}
}
@@ -279,30 +290,29 @@ class LoadFiles extends ManagerRepo {
long finish() {
var results = conditionalMutator.process();
- boolean allDone =
- results.values().stream().allMatch(result -> result.getStatus() ==
Status.ACCEPTED)
- && skipped == 0;
-
- long sleepTime = 0;
- if (!allDone) {
- sleepTime = 1000;
-
- results.forEach((extent, condResult) -> {
- if (condResult.getStatus() != Status.ACCEPTED) {
- var metadata = condResult.readMetadata();
- if (metadata == null) {
- log.debug("Tablet update failed, tablet is gone {} {} {}",
fateId, extent,
- condResult.getStatus());
- } else {
- log.debug("Tablet update failed {} {} {} {} {} {}", fateId,
extent,
- condResult.getStatus(), metadata.getOperationId(),
metadata.getLocation(),
- metadata.getLoaded());
- }
+ AtomicBoolean seenFailure = new AtomicBoolean(false);
+ results.forEach((extent, condResult) -> {
+ if (condResult.getStatus() == Status.ACCEPTED) {
+ loadingFiles.get(extent).forEach(file ->
TabletLogger.bulkImported(extent, file));
+ } else {
+ seenFailure.set(true);
+ var metadata = condResult.readMetadata();
+ if (metadata == null) {
+ log.debug("Tablet update failed, tablet is gone {} {} {}", fateId,
extent,
+ condResult.getStatus());
+ } else {
+ log.debug("Tablet update failed {} {} {} {} {} {}", fateId, extent,
+ condResult.getStatus(), metadata.getOperationId(),
metadata.getLocation(),
+ metadata.getLoaded());
}
- });
- }
+ }
+ });
- return sleepTime;
+ if (seenFailure.get() || skipped != 0) {
+ return 1000;
+ } else {
+ return 0;
+ }
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 200a48a089..d9424b68fe 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -346,8 +346,6 @@ class CompactionDriver extends ManagerRepo {
private void cleanupTabletMetadata(FateId fateId, Manager manager) throws
Exception {
var ample = manager.getContext().getAmple();
- // ELASTICITY_TODO use existing compaction logging
-
boolean allCleanedUp = false;
Retry retry =
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))