This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/hive.git
commit d24764a674ac6efae26414c10e4c8996967b7f7f Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Mon Mar 4 23:26:40 2024 +0530 HIVE-28102: Iceberg: Invoke validateDataFilesExist for RowDelta operations. (#5111). (Ayush Saxena, reviewed by Denys Kuzmenko) (cherry picked from commit 41bf5d55f6ca9bc2ab6af2f3fc34cc64c7b26f01) --- .../org/apache/iceberg/mr/hive/FilesForCommit.java | 29 +++++++++++++++------ .../mr/hive/HiveIcebergOutputCommitter.java | 30 ++++++++++++++-------- .../writer/HiveIcebergCopyOnWriteRecordWriter.java | 8 +++--- .../mr/hive/writer/HiveIcebergDeleteWriter.java | 4 ++- .../hive/writer/TestHiveIcebergDeleteWriter.java | 8 ++++++ 5 files changed, 55 insertions(+), 24 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java index 1bc5ea3a674..2e25f5a8c2e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java @@ -33,29 +33,37 @@ public class FilesForCommit implements Serializable { private final Collection<DataFile> dataFiles; private final Collection<DeleteFile> deleteFiles; - private Collection<DataFile> referencedDataFiles; + private final Collection<DataFile> replacedDataFiles; + private final Collection<CharSequence> referencedDataFiles; public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) { this(dataFiles, deleteFiles, Collections.emptyList()); } public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles, - Collection<DataFile> referencedDataFiles) { + Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles) { this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; + this.replacedDataFiles = replacedDataFiles; this.referencedDataFiles = referencedDataFiles; } - public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) { - return new FilesForCommit(Collections.emptyList(), deleteFiles); + public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles, + Collection<DataFile> replacedDataFiles) { + this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet()); + } + + public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles, + Collection<CharSequence> referencedDataFiles) { + return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles); } public static FilesForCommit onlyData(Collection<DataFile> dataFiles) { return new FilesForCommit(dataFiles, Collections.emptyList()); } - public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> referencedDataFiles) { - return new FilesForCommit(dataFiles, Collections.emptyList(), referencedDataFiles); + public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> replacedDataFiles) { + return new FilesForCommit(dataFiles, Collections.emptyList(), replacedDataFiles); } public static FilesForCommit empty() { @@ -70,7 +78,11 @@ public class FilesForCommit implements Serializable { return deleteFiles; } - public Collection<DataFile> referencedDataFiles() { + public Collection<DataFile> replacedDataFiles() { + return replacedDataFiles; + } + + public Collection<CharSequence> referencedDataFiles() { return referencedDataFiles; } @@ -79,7 +91,7 @@ public class FilesForCommit implements Serializable { } public boolean isEmpty() { - return dataFiles.isEmpty() && deleteFiles.isEmpty() && referencedDataFiles.isEmpty(); + return dataFiles.isEmpty() && deleteFiles.isEmpty() && replacedDataFiles.isEmpty(); } @Override @@ -87,6 +99,7 @@ public class FilesForCommit implements Serializable { return MoreObjects.toStringHelper(this) .add("dataFiles", dataFiles.toString()) .add("deleteFiles", deleteFiles.toString()) + .add("replacedDataFiles", replacedDataFiles.toString()) .add("referencedDataFiles", referencedDataFiles.toString()) .toString(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index ba64faa6188..7f4b9e12c3a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -79,6 +79,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -142,16 +143,18 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, attemptID.getJobID(), attemptID.getTaskID().getId()); if (writers.get(output) != null) { - Collection<DataFile> dataFiles = Lists.newArrayList(); - Collection<DeleteFile> deleteFiles = Lists.newArrayList(); - Collection<DataFile> referencedDataFiles = Lists.newArrayList(); + List<DataFile> dataFiles = Lists.newArrayList(); + List<DeleteFile> deleteFiles = Lists.newArrayList(); + List<DataFile> replacedDataFiles = Lists.newArrayList(); + Set<CharSequence> referencedDataFiles = Sets.newHashSet(); for (HiveIcebergWriter writer : writers.get(output)) { FilesForCommit files = writer.files(); dataFiles.addAll(files.dataFiles()); deleteFiles.addAll(files.deleteFiles()); + replacedDataFiles.addAll(files.replacedDataFiles()); referencedDataFiles.addAll(files.referencedDataFiles()); } - createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles), + createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles), fileForCommitLocation, table.io()); } else { LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID); @@ -412,7 +415,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { } List<DataFile> dataFiles = Lists.newArrayList(); List<DeleteFile> deleteFiles = Lists.newArrayList(); - List<DataFile> referencedDataFiles = Lists.newArrayList(); + List<DataFile> replacedDataFiles = Lists.newArrayList(); + Set<CharSequence> referencedDataFiles = Sets.newHashSet(); Table table = null; String branchName = null; @@ -439,10 +443,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { numTasks, executor, outputTable.table.location(), jobContext, io, true); dataFiles.addAll(writeResults.dataFiles()); deleteFiles.addAll(writeResults.deleteFiles()); + replacedDataFiles.addAll(writeResults.replacedDataFiles()); referencedDataFiles.addAll(writeResults.referencedDataFiles()); } - FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles); + FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles); long startTime = System.currentTimeMillis(); if (Operation.IOW != operation) { @@ -485,9 +490,9 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { private void commitWrite(Table table, String branchName, Long snapshotId, long startTime, FilesForCommit results, Operation operation) { - if (!results.referencedDataFiles().isEmpty()) { + if (!results.replacedDataFiles().isEmpty()) { OverwriteFiles write = table.newOverwrite(); - results.referencedDataFiles().forEach(write::deleteFile); + results.replacedDataFiles().forEach(write::deleteFile); results.dataFiles().forEach(write::addFile); if (StringUtils.isNotEmpty(branchName)) { @@ -497,6 +502,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { write.validateFromSnapshot(snapshotId); } write.validateNoConflictingData(); + write.validateNoConflictingDeletes(); write.commit(); return; } @@ -523,6 +529,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { write.validateDeletedFiles(); write.validateNoConflictingDeleteFiles(); } + write.validateDataFilesExist(results.referencedDataFiles()); write.validateNoConflictingDataFiles(); write.commit(); } @@ -660,7 +667,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { // starting from 0. Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>(); Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>(); - Collection<DataFile> referencedDataFiles = new ConcurrentLinkedQueue<>(); + Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>(); + Collection<CharSequence> referencedDataFiles = new ConcurrentLinkedQueue<>(); Tasks.range(numTasks) .throwFailureWhenFinished(throwOnFailure) .executeWith(executor) @@ -670,11 +678,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { FilesForCommit files = readFileForCommit(taskFileName, io); dataFiles.addAll(files.dataFiles()); deleteFiles.addAll(files.deleteFiles()); + replacedDataFiles.addAll(files.replacedDataFiles()); referencedDataFiles.addAll(files.referencedDataFiles()); - }); - return new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles); + return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles); } /** diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java index f13f13ec046..142f73c8549 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java @@ -45,7 +45,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { private final int currentSpecId; private final GenericRecord rowDataTemplate; - private final List<DataFile> referencedDataFiles; + private final List<DataFile> replacedDataFiles; HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, @@ -54,7 +54,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize)); this.currentSpecId = currentSpecId; this.rowDataTemplate = GenericRecord.create(schema); - this.referencedDataFiles = Lists.newArrayList(); + this.replacedDataFiles = Lists.newArrayList(); } @Override @@ -72,7 +72,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { .withFileSizeInBytes(0) .withRecordCount(0) .build(); - referencedDataFiles.add(dataFile); + replacedDataFiles.add(dataFile); } else { writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId)); } @@ -81,6 +81,6 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { @Override public FilesForCommit files() { List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles(); - return FilesForCommit.onlyData(dataFiles, referencedDataFiles); + return FilesForCommit.onlyData(dataFiles, replacedDataFiles); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java index bd61f101cd9..6753ffa46c2 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java @@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive.writer; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.io.Writable; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; @@ -68,6 +69,7 @@ class HiveIcebergDeleteWriter extends HiveIcebergWriterBase { @Override public FilesForCommit files() { List<DeleteFile> deleteFiles = ((DeleteWriteResult) writer.result()).deleteFiles(); - return FilesForCommit.onlyDelete(deleteFiles); + Set<CharSequence> referencedDataFiles = ((DeleteWriteResult) writer.result()).referencedDataFiles(); + return FilesForCommit.onlyDelete(deleteFiles, referencedDataFiles); } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java index 9cac3a02620..a2b6ee5fe63 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java @@ -20,6 +20,7 @@ package org.apache.iceberg.mr.hive.writer; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.iceberg.mr.hive.IcebergAcidUtil; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Test; @@ -54,18 +56,24 @@ public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase { Collections.sort(deleteRecords, Comparator.comparing(a -> a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString())); + CharSequenceSet expectedDataFiles = CharSequenceSet.empty(); Container<Record> container = new Container<>(); for (Record deleteRecord : deleteRecords) { container.set(deleteRecord); testWriter.write(container); + expectedDataFiles.add((String) deleteRecord.getField(MetadataColumns.FILE_PATH.name())); } testWriter.close(false); RowDelta rowDelta = table.newRowDelta(); testWriter.files().deleteFiles().forEach(rowDelta::addDeletes); + Collection<CharSequence> actualDataFiles = testWriter.files().referencedDataFiles(); rowDelta.commit(); + Assert.assertTrue("Actual :" + actualDataFiles + " Expected: " + expectedDataFiles, + actualDataFiles.containsAll(expectedDataFiles)); + StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS); StructLikeSet actual = actualRowSet(table);