This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 19a9ad9d65 improves searchability of bulk import logging (#5997)
19a9ad9d65 is described below
commit 19a9ad9d6536b6b2a37fc17a4453356388cbf23f
Author: Keith Turner <[email protected]>
AuthorDate: Tue Dec 2 09:35:27 2025 -0800
improves searchability of bulk import logging (#5997)
Compared to bulkV1, the bulkV2 logging is more sparse and its difficult
to track the key pieces of information about a bulk import in the logs.
Added some logging that ties together all the important information
about a bulk import into a single log message that contains the table id,
fate id, source dir, and dest dir. Also added logging for tracking
renames done by bulkv2 and tracking what tablets files went to.
Cleaned up some of the existing logging.
---
.../apache/accumulo/core/logging/BulkLogger.java | 71 ++++++++++++++++++++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 3 +-
.../manager/tableOps/bulkVer2/BulkImportMove.java | 6 +-
.../manager/tableOps/bulkVer2/LoadFiles.java | 12 ++--
.../manager/tableOps/bulkVer2/PrepBulkImport.java | 5 ++
5 files changed, 86 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
new file mode 100644
index 0000000000..90d5552984
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.logging;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This logger tracks significant changes that bulk import v2 operations make
to DFS and tablets.
+ */
+public class BulkLogger {
+ private static final Logger log = LoggerFactory.getLogger(Logging.PREFIX +
"bulk");
+
+ public static void initiating(long fateId, TableId tableId, boolean setTime,
String sourceDir,
+ String destinationDir) {
+ // Log the key pieces of information about a bulk import in a single log
message to tie them all
+ // together.
+ log.info("{} initiating bulk import, tableId:{} setTime:{} source:{}
destination:{}",
+ FateTxId.formatTid(fateId), tableId, setTime, sourceDir,
destinationDir);
+ }
+
+ public static void renamed(String fateId, Path source, Path destination) {
+ // The initiating message logged the full directory paths, so do not need
to repeat that
+ // information here. Log the bulk destination directory as it is unique
and easy to search for.
+ log.debug("{} renamed {} to {}/{}", fateId, source.getName(),
destination.getParent().getName(),
+ destination.getName());
+ }
+
+ /**
+ * This is called when a bulk load operation is cleaning up load entries in
the metadata table.
+ * Turning this on allows seeing what files were loaded into which tablets
by a bulk load
+ * operation. The information logged is redundant with
+ * {@link TabletLogger#bulkImported(KeyExtent, TabletFile)} except that it
will happen on the
+ * manager instead of the tserver.
+ */
+ public static void deletingLoadEntry(Map.Entry<Key,Value> entry) {
+ if (log.isTraceEnabled()) {
+ var key = entry.getKey();
+ // the column qualifier contains the file loaded
+ var path = new Path(key.getColumnQualifierData().toString());
+ // the value is the fate id
+ log.trace("{} loaded {}/{} into {}", entry.getValue(),
path.getParent().getName(),
+ path.getName(), key.getRowData());
+ }
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 857853311c..22cb7a941f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.gc.GcCandidate;
import org.apache.accumulo.core.gc.ReferenceFile;
+import org.apache.accumulo.core.logging.BulkLogger;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
@@ -199,7 +200,7 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
log.trace("Looking at entry {} with tid {}", entry, tid);
long entryTid = BulkFileColumnFamily.getBulkLoadTid(entry.getValue());
if (tid == entryTid) {
- log.trace("deleting entry {}", entry);
+ BulkLogger.deletingLoadEntry(entry);
Key key = entry.getKey();
Mutation m = new Mutation(key.getRow());
m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index 72e4923f6d..6eb247b33c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.logging.BulkLogger;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.manager.Manager;
@@ -75,10 +76,6 @@ class BulkImportMove extends ManagerRepo {
final Path bulkDir = new Path(bulkInfo.bulkDir);
final Path sourceDir = new Path(bulkInfo.sourceDir);
- String fmtTid = FateTxId.formatTid(tid);
-
- log.debug("{} sourceDir {}", fmtTid, sourceDir);
-
VolumeManager fs = manager.getVolumeManager();
if (bulkInfo.tableState == TableState.ONLINE) {
@@ -120,6 +117,7 @@ class BulkImportMove extends ManagerRepo {
}
try {
fs.bulkRename(oldToNewMap, workerCount,
BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid);
+ oldToNewMap.forEach((oldPath, newPath) -> BulkLogger.renamed(fmtTid,
oldPath, newPath));
} catch (IOException ioe) {
throw new
AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index c7e828b447..e26db3c4e6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -108,7 +108,7 @@ class LoadFiles extends ManagerRepo {
@Override
public long isReady(long tid, Manager manager) throws Exception {
- log.info("Starting for {} (tid = {})", bulkInfo.sourceDir,
FateTxId.formatTid(tid));
+ log.trace("Starting for {} (tid = {})", bulkInfo.sourceDir,
FateTxId.formatTid(tid));
if (manager.onlineTabletServers().isEmpty()) {
log.warn("There are no tablet server to process bulkDir import, waiting
(tid = "
+ FateTxId.formatTid(tid) + ")");
@@ -293,10 +293,10 @@ class LoadFiles extends ManagerRepo {
fmtTid, outdatedTservers);
}
- if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
var tabletStats =
queue.values().stream().mapToInt(Map::size).summaryStatistics();
- log.debug(
+ log.trace(
"{} sent {} messages to {} tablet servers for {} tablets (min:{}
max:{} avg:{} "
+ "tablets per tserver), send time:{}ms recv time:{}ms
{}:{}",
fmtTid, clients.size(), queue.size(), tabletStats.getSum(),
tabletStats.getMin(),
@@ -521,7 +521,7 @@ class LoadFiles extends ManagerRepo {
if (!pi.findWithin(
tm -> PREV_COMP.compare(tm.getPrevEndRow(),
loadMapKey.prevEndRow()) >= 0,
skipDistance)) {
- log.debug(
+ log.trace(
"{}: Next load mapping range {} not found in {} tablets,
recreating TabletMetadata to jump ahead",
fmtTid, loadMapKey.prevEndRow(), skipDistance);
tabletsMetadata.close();
@@ -540,8 +540,8 @@ class LoadFiles extends ManagerRepo {
log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
- if (importTimingStats.callCount > 0) {
- log.debug(
+ if (importTimingStats.callCount > 0 && log.isTraceEnabled()) {
+ log.trace(
"Stats for {} (tid = {}): processed {} tablets in {} calls which
took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}%
of the processing time.",
bulkInfo.sourceDir, FateTxId.formatTid(tid),
importTimingStats.tabletCount,
importTimingStats.callCount, totalProcessingTime.toMillis(),
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index e6371628cc..0c1c5d7473 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.file.FilePrefix;
+import org.apache.accumulo.core.logging.BulkLogger;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.PeekingIterator;
@@ -290,6 +291,10 @@ public class PrepBulkImport extends ManagerRepo {
BulkSerialize.writeRenameMap(oldToNewNameMap, bulkDir.toString(),
fs::create);
bulkInfo.bulkDir = bulkDir.toString();
+
+ BulkLogger.initiating(tid, bulkInfo.tableId, bulkInfo.setTime,
bulkInfo.sourceDir,
+ bulkInfo.bulkDir);
+
// return the next step, which will move files
return new BulkImportMove(bulkInfo);
}