This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/hive.git

commit d24764a674ac6efae26414c10e4c8996967b7f7f
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Mon Mar 4 23:26:40 2024 +0530

    HIVE-28102: Iceberg: Invoke validateDataFilesExist for RowDelta operations. 
(#5111). (Ayush Saxena, reviewed by Denys Kuzmenko)
    
    (cherry picked from commit 41bf5d55f6ca9bc2ab6af2f3fc34cc64c7b26f01)
---
 .../org/apache/iceberg/mr/hive/FilesForCommit.java | 29 +++++++++++++++------
 .../mr/hive/HiveIcebergOutputCommitter.java        | 30 ++++++++++++++--------
 .../writer/HiveIcebergCopyOnWriteRecordWriter.java |  8 +++---
 .../mr/hive/writer/HiveIcebergDeleteWriter.java    |  4 ++-
 .../hive/writer/TestHiveIcebergDeleteWriter.java   |  8 ++++++
 5 files changed, 55 insertions(+), 24 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 1bc5ea3a674..2e25f5a8c2e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -33,29 +33,37 @@ public class FilesForCommit implements Serializable {
 
   private final Collection<DataFile> dataFiles;
   private final Collection<DeleteFile> deleteFiles;
-  private Collection<DataFile> referencedDataFiles;
+  private final Collection<DataFile> replacedDataFiles;
+  private final Collection<CharSequence> referencedDataFiles;
 
   public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles) {
     this(dataFiles, deleteFiles, Collections.emptyList());
   }
 
   public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles,
-                        Collection<DataFile> referencedDataFiles) {
+      Collection<DataFile> replacedDataFiles, Collection<CharSequence> 
referencedDataFiles) {
     this.dataFiles = dataFiles;
     this.deleteFiles = deleteFiles;
+    this.replacedDataFiles = replacedDataFiles;
     this.referencedDataFiles = referencedDataFiles;
   }
 
-  public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
-    return new FilesForCommit(Collections.emptyList(), deleteFiles);
+  public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles,
+      Collection<DataFile> replacedDataFiles) {
+    this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
+  }
+
+  public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
+      Collection<CharSequence> referencedDataFiles) {
+    return new FilesForCommit(Collections.emptyList(), deleteFiles, 
Collections.emptyList(), referencedDataFiles);
   }
 
   public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
     return new FilesForCommit(dataFiles, Collections.emptyList());
   }
 
-  public static FilesForCommit onlyData(Collection<DataFile> dataFiles, 
Collection<DataFile> referencedDataFiles) {
-    return new FilesForCommit(dataFiles, Collections.emptyList(), 
referencedDataFiles);
+  public static FilesForCommit onlyData(Collection<DataFile> dataFiles, 
Collection<DataFile> replacedDataFiles) {
+    return new FilesForCommit(dataFiles, Collections.emptyList(), 
replacedDataFiles);
   }
 
   public static FilesForCommit empty() {
@@ -70,7 +78,11 @@ public class FilesForCommit implements Serializable {
     return deleteFiles;
   }
 
-  public Collection<DataFile> referencedDataFiles() {
+  public Collection<DataFile> replacedDataFiles() {
+    return replacedDataFiles;
+  }
+
+  public Collection<CharSequence> referencedDataFiles() {
     return referencedDataFiles;
   }
 
@@ -79,7 +91,7 @@ public class FilesForCommit implements Serializable {
   }
 
   public boolean isEmpty() {
-    return dataFiles.isEmpty() && deleteFiles.isEmpty() && 
referencedDataFiles.isEmpty();
+    return dataFiles.isEmpty() && deleteFiles.isEmpty() && 
replacedDataFiles.isEmpty();
   }
 
   @Override
@@ -87,6 +99,7 @@ public class FilesForCommit implements Serializable {
     return MoreObjects.toStringHelper(this)
         .add("dataFiles", dataFiles.toString())
         .add("deleteFiles", deleteFiles.toString())
+        .add("replacedDataFiles", replacedDataFiles.toString())
         .add("referencedDataFiles", referencedDataFiles.toString())
         .toString();
   }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index ba64faa6188..7f4b9e12c3a 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -79,6 +79,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -142,16 +143,18 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
               String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
                   attemptID.getJobID(), attemptID.getTaskID().getId());
               if (writers.get(output) != null) {
-                Collection<DataFile> dataFiles = Lists.newArrayList();
-                Collection<DeleteFile> deleteFiles = Lists.newArrayList();
-                Collection<DataFile> referencedDataFiles = 
Lists.newArrayList();
+                List<DataFile> dataFiles = Lists.newArrayList();
+                List<DeleteFile> deleteFiles = Lists.newArrayList();
+                List<DataFile> replacedDataFiles = Lists.newArrayList();
+                Set<CharSequence> referencedDataFiles = Sets.newHashSet();
                 for (HiveIcebergWriter writer : writers.get(output)) {
                   FilesForCommit files = writer.files();
                   dataFiles.addAll(files.dataFiles());
                   deleteFiles.addAll(files.deleteFiles());
+                  replacedDataFiles.addAll(files.replacedDataFiles());
                   referencedDataFiles.addAll(files.referencedDataFiles());
                 }
-                createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, 
referencedDataFiles),
+                createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles),
                     fileForCommitLocation, table.io());
               } else {
                 LOG.info("CommitTask found no writer for specific table: {}, 
attemptID: {}", output, attemptID);
@@ -412,7 +415,8 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     }
     List<DataFile> dataFiles = Lists.newArrayList();
     List<DeleteFile> deleteFiles = Lists.newArrayList();
-    List<DataFile> referencedDataFiles = Lists.newArrayList();
+    List<DataFile> replacedDataFiles = Lists.newArrayList();
+    Set<CharSequence> referencedDataFiles = Sets.newHashSet();
 
     Table table = null;
     String branchName = null;
@@ -439,10 +443,11 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           numTasks, executor, outputTable.table.location(), jobContext, io, 
true);
       dataFiles.addAll(writeResults.dataFiles());
       deleteFiles.addAll(writeResults.deleteFiles());
+      replacedDataFiles.addAll(writeResults.replacedDataFiles());
       referencedDataFiles.addAll(writeResults.referencedDataFiles());
     }
 
-    FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, 
referencedDataFiles);
+    FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles);
     long startTime = System.currentTimeMillis();
 
     if (Operation.IOW != operation) {
@@ -485,9 +490,9 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
   private void commitWrite(Table table, String branchName, Long snapshotId, 
long startTime,
       FilesForCommit results, Operation operation) {
 
-    if (!results.referencedDataFiles().isEmpty()) {
+    if (!results.replacedDataFiles().isEmpty()) {
       OverwriteFiles write = table.newOverwrite();
-      results.referencedDataFiles().forEach(write::deleteFile);
+      results.replacedDataFiles().forEach(write::deleteFile);
       results.dataFiles().forEach(write::addFile);
 
       if (StringUtils.isNotEmpty(branchName)) {
@@ -497,6 +502,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
         write.validateFromSnapshot(snapshotId);
       }
       write.validateNoConflictingData();
+      write.validateNoConflictingDeletes();
       write.commit();
       return;
     }
@@ -523,6 +529,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
         write.validateDeletedFiles();
         write.validateNoConflictingDeleteFiles();
       }
+      write.validateDataFilesExist(results.referencedDataFiles());
       write.validateNoConflictingDataFiles();
       write.commit();
     }
@@ -660,7 +667,8 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     // starting from 0.
     Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
     Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
-    Collection<DataFile> referencedDataFiles = new ConcurrentLinkedQueue<>();
+    Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
+    Collection<CharSequence> referencedDataFiles = new 
ConcurrentLinkedQueue<>();
     Tasks.range(numTasks)
         .throwFailureWhenFinished(throwOnFailure)
         .executeWith(executor)
@@ -670,11 +678,11 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           FilesForCommit files = readFileForCommit(taskFileName, io);
           dataFiles.addAll(files.dataFiles());
           deleteFiles.addAll(files.deleteFiles());
+          replacedDataFiles.addAll(files.replacedDataFiles());
           referencedDataFiles.addAll(files.referencedDataFiles());
-
         });
 
-    return new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
+    return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, 
referencedDataFiles);
   }
 
   /**
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
index f13f13ec046..142f73c8549 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
@@ -45,7 +45,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends 
HiveIcebergWriterBase {
   private final int currentSpecId;
 
   private final GenericRecord rowDataTemplate;
-  private final List<DataFile> referencedDataFiles;
+  private final List<DataFile> replacedDataFiles;
 
   HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, 
PartitionSpec> specs, int currentSpecId,
       FileWriterFactory<Record> fileWriterFactory, OutputFileFactory 
fileFactory, FileIO io,
@@ -54,7 +54,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends 
HiveIcebergWriterBase {
         new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, 
targetFileSize));
     this.currentSpecId = currentSpecId;
     this.rowDataTemplate = GenericRecord.create(schema);
-    this.referencedDataFiles = Lists.newArrayList();
+    this.replacedDataFiles = Lists.newArrayList();
   }
 
   @Override
@@ -72,7 +72,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends 
HiveIcebergWriterBase {
             .withFileSizeInBytes(0)
             .withRecordCount(0)
             .build();
-      referencedDataFiles.add(dataFile);
+      replacedDataFiles.add(dataFile);
     } else {
       writer.write(rowData, specs.get(currentSpecId), partition(rowData, 
currentSpecId));
     }
@@ -81,6 +81,6 @@ class HiveIcebergCopyOnWriteRecordWriter extends 
HiveIcebergWriterBase {
   @Override
   public FilesForCommit files() {
     List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
-    return FilesForCommit.onlyData(dataFiles, referencedDataFiles);
+    return FilesForCommit.onlyData(dataFiles, replacedDataFiles);
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
index bd61f101cd9..6753ffa46c2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive.writer;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.PartitionSpec;
@@ -68,6 +69,7 @@ class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
   @Override
   public FilesForCommit files() {
     List<DeleteFile> deleteFiles = ((DeleteWriteResult) 
writer.result()).deleteFiles();
-    return FilesForCommit.onlyDelete(deleteFiles);
+    Set<CharSequence> referencedDataFiles = ((DeleteWriteResult) 
writer.result()).referencedDataFiles();
+    return FilesForCommit.onlyDelete(deleteFiles, referencedDataFiles);
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
index 9cac3a02620..a2b6ee5fe63 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,18 +56,24 @@ public class TestHiveIcebergDeleteWriter extends 
HiveIcebergWriterTestBase {
     Collections.sort(deleteRecords,
         Comparator.comparing(a -> 
a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString()));
 
+    CharSequenceSet expectedDataFiles = CharSequenceSet.empty();
     Container<Record> container = new Container<>();
     for (Record deleteRecord : deleteRecords) {
       container.set(deleteRecord);
       testWriter.write(container);
+      expectedDataFiles.add((String) 
deleteRecord.getField(MetadataColumns.FILE_PATH.name()));
     }
 
     testWriter.close(false);
 
     RowDelta rowDelta = table.newRowDelta();
     testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
+    Collection<CharSequence> actualDataFiles = 
testWriter.files().referencedDataFiles();
     rowDelta.commit();
 
+    Assert.assertTrue("Actual :" + actualDataFiles + " Expected: " + 
expectedDataFiles,
+        actualDataFiles.containsAll(expectedDataFiles));
+
     StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS);
     StructLikeSet actual = actualRowSet(table);
 

Reply via email to