This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ef174d3aa IMPALA-12162: Checksum files before lock in INSERT
ef174d3aa is described below
commit ef174d3aa5405043fa5084cac83bafcdc1afd473
Author: Michael Smith <[email protected]>
AuthorDate: Thu May 8 16:13:11 2025 -0700
IMPALA-12162: Checksum files before lock in INSERT
Collect file metadata - file checksums and ACID directory path - before
acquiring the table lock. Table lock doesn't prevent files from being
deleted from the underlying filesystem, and these operations can take
time, blocking other operations that depend on the table lock.
Fires InsertEvents with partial data if there are errors collecting
checksum or acidDirPath on individual files to provide best-effort
information. Hive defaults to empty string for these values when not
specified.
IMPALA-10254 has been resolved, so removes the exception for
FeIcebergTable and associated TODO.
Change-Id: I18f9686f5d53cf1e7c384684c25427fb5353e2af
Reviewed-on: http://gerrit.cloudera.org:8080/22871
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 129 +++++++++++++--------
1 file changed, 82 insertions(+), 47 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1ce5bd01a..9e1683098 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7342,6 +7342,11 @@ public class CatalogOpExecutor {
throw new InternalException("Unexpected table type: " +
update.getTarget_table());
}
+ final FeFsTable feFsTable = (FeFsTable) table;
+
+ // Collect file checksums (and ACID dir path) before taking table lock.
+ Map<String, List<FileMetadata>> fileMetadata = getFileMetadata(
+ feFsTable, update.getUpdated_partitions(), catalogTimeline);
tryWriteLock(table, "updating the catalog", catalogTimeline);
final Timer.Context context
@@ -7392,7 +7397,7 @@ public class CatalogOpExecutor {
.build());
}
}
- Collection<? extends FeFsPartition> parts =
((FeFsTable)table).loadAllPartitions();
+ Collection<? extends FeFsPartition> parts =
feFsTable.loadAllPartitions();
List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
List<org.apache.hadoop.hive.metastore.api.Partition>
hmsPartitionsStatsUnset =
Lists.newArrayList();
@@ -7454,7 +7459,7 @@ public class CatalogOpExecutor {
// Before commit fire insert events if external event processing is
// enabled. This is best-effort. Any errors in it should not fail the
INSERT.
try {
- createInsertEvents((FeFsTable) table, update.getUpdated_partitions(),
+ createInsertEvents(feFsTable, fileMetadata,
addedPartitionNames, update.is_overwrite, tblTxn, catalogTimeline);
} catch (Exception e) {
LOG.error("Failed to fire insert events for table {}",
table.getFullName(), e);
@@ -7709,14 +7714,13 @@ public class CatalogOpExecutor {
* does, see:
*
https://github.com/apache/hive/blob/25892ea409/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L3251
* @param table The target table.
- * @param updatedPartitions All affected partitions with the list of new
files
- * inserted.
+ * @param updatedPartitions All affected partitions with FileMetadata for
new files.
* @param addedPartitionNames List of new partitions created during the
insert.
* @param isInsertOverwrite Indicates if the operation was an insert
overwrite.
* @param tblTxn Contains the transactionId and the writeId for the insert.
*/
private void createInsertEvents(FeFsTable table,
- Map<String, TUpdatedPartition> updatedPartitions,
+ Map<String, List<FileMetadata>> updatedPartitions,
Map<String, List<String>> addedPartitionNames,
boolean isInsertOverwrite, TblTransaction tblTxn, EventSequence
catalogTimeline)
throws CatalogException, MetaException {
@@ -7796,11 +7800,11 @@ public class CatalogOpExecutor {
*/
private void prepareInsertEventData(FeFsTable table,
String partName, List<String> partVals,
- Map<String, TUpdatedPartition> updatedPartitions,
+ Map<String, List<FileMetadata>> updatedPartitions,
boolean isInsertOverwrite, boolean isPartitioned,
List<InsertEventRequestData> insertEventReqDatas,
- List<List<String>> insertEventPartVals) throws CatalogException {
- List<String> newFiles = updatedPartitions.get(partName).getFiles();
+ List<List<String>> insertEventPartVals) {
+ List<FileMetadata> newFiles = updatedPartitions.get(partName);
if (!newFiles.isEmpty() || isInsertOverwrite) {
LOG.info("{} new files detected for table {}{}",
newFiles.size(), table.getFullName(),
@@ -7843,50 +7847,81 @@ public class CatalogOpExecutor {
return BackendConfig.INSTANCE.isInsertEventsEnabled();
}
+ private class FileMetadata {
+ String filename;
+ FileChecksum checksum;
+ String acidDirPath;
+
+ FileMetadata(String filename, FileChecksum checksum, String acidDirPath) {
+ this.filename = filename;
+ this.checksum = checksum;
+ this.acidDirPath = acidDirPath;
+ }
+
+ String getChecksum() {
+ return checksum == null ? "" :
+ StringUtils.byteToHexString(checksum.getBytes(), 0,
checksum.getLength());
+ }
+ }
+
+ /**
+ * Returns a map of partition name to list of file metadata: name, checksum,
and
+ * acidDirPath. Logs errors on individual files.
+ */
+ private Map<String, List<FileMetadata>> getFileMetadata(FeFsTable table,
+ Map<String, TUpdatedPartition> updatedPartitions, EventSequence
catalogTimeline) {
+ if (!shouldGenerateInsertEvents(table)) return null;
+ boolean isPartitioned = table.isPartitioned();
+ boolean isTransactional = AcidUtils.isTransactionalTable(table);
+
+ // Get table file system with table location.
+ FileSystem tableFs = null;
+ try {
+ tableFs = table.getFileSystem();
+ } catch (CatalogException e) {
+ LOG.warn("Failed to get FileSystem for table {}", table.getFullName(),
e);
+ }
+
+ Map<String, List<FileMetadata>> fileMetadata = Maps.newHashMap();
+ for (Map.Entry<String, TUpdatedPartition> e :
updatedPartitions.entrySet()) {
+ List<FileMetadata> files = new
ArrayList<>(e.getValue().getFiles().size());
+ for (String file : e.getValue().getFiles()) {
+ FileChecksum checksum = null;
+ String acidDirPath = null;
+ try {
+ Path filePath = new Path(file);
+ FileSystem fs = (isPartitioned || tableFs == null) ?
+ FeFsTable.getFileSystem(filePath) : tableFs;
+ checksum = fs.getFileChecksum(filePath);
+ if (isTransactional) {
+ acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
+ }
+ } catch (CatalogException | IOException ex) {
+ LOG.error("Failed to collect insert metadata for {} in table {}",
+ file, table.getFullName(), ex);
+ }
+ files.add(new FileMetadata(file, checksum, acidDirPath));
+ }
+ fileMetadata.put(e.getKey(), files);
+ }
+ catalogTimeline.markEvent("Collected file checksums");
+ return fileMetadata;
+ }
+
private InsertEventRequestData makeInsertEventData(FeFsTable tbl,
List<String> partVals,
- List<String> newFiles, boolean isInsertOverwrite) throws
CatalogException {
+ List<FileMetadata> newFiles, boolean isInsertOverwrite) {
Preconditions.checkNotNull(newFiles);
Preconditions.checkNotNull(partVals);
InsertEventRequestData insertEventRequestData = new InsertEventRequestData(
- Lists.newArrayListWithCapacity(
- newFiles.size()));
+ new ArrayList<>(newFiles.size()));
boolean isTransactional = AcidUtils.isTransactionalTable(tbl);
- // in case of unpartitioned table, partVals will be empty
- boolean isPartitioned = !partVals.isEmpty();
- if (isPartitioned) {
- MetastoreShim.setPartitionVal(insertEventRequestData, partVals);
- }
- // Get table file system with table location.
- FileSystem tableFs = tbl.getFileSystem();
- FileSystem fs;
- for (String file : newFiles) {
- try {
- Path filePath = new Path(file);
- if (!isPartitioned) {
- fs = tableFs;
- } else {
- // Partitions may be in different file systems.
- fs = FeFsTable.getFileSystem(filePath);
- }
- FileChecksum checkSum = fs.getFileChecksum(filePath);
- String checksumStr = checkSum == null ? ""
- : StringUtils.byteToHexString(checkSum.getBytes(), 0,
checkSum.getLength());
- insertEventRequestData.addToFilesAdded(file);
- insertEventRequestData.addToFilesAddedChecksum(checksumStr);
- if (isTransactional) {
- String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath,
fs);
- if (acidDirPath != null) {
- MetastoreShim.addToSubDirectoryList(insertEventRequestData,
acidDirPath);
- }
- }
- insertEventRequestData.setReplace(isInsertOverwrite);
- } catch (IOException e) {
- if (tbl instanceof FeIcebergTable) {
- // TODO IMPALA-10254: load data files via Iceberg API. Currently we
load
- // Iceberg data files via file listing, so we might see files being
written.
- continue;
- }
- throw new CatalogException("Could not get the file checksum for file "
+ file, e);
+ MetastoreShim.setPartitionVal(insertEventRequestData, partVals);
+ insertEventRequestData.setReplace(isInsertOverwrite);
+ for (FileMetadata metadata : newFiles) {
+ insertEventRequestData.addToFilesAdded(metadata.filename);
+ insertEventRequestData.addToFilesAddedChecksum(metadata.getChecksum());
+ if (isTransactional) {
+ MetastoreShim.addToSubDirectoryList(insertEventRequestData,
metadata.acidDirPath);
}
}
return insertEventRequestData;