lcspinter commented on code in PR #3204:
URL: https://github.com/apache/hive/pull/3204#discussion_r866544611


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
     });
 
-    Collection<FilesForCommit> writeResults = collectResults(numTasks, 
executor, location, jobContext, io, true);
-    if (writeResults.isEmpty()) {
-      LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete: 
{}, since there were no new files to add",
-          table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
-    } else {
-      if (HiveIcebergStorageHandler.isDelete(conf, name)) {
-        commitDelete(table, Optional.empty(), startTime, writeResults);
+    FilesForCommit writeResults = collectResults(numTasks, executor, location, 
jobContext, io, true);
+    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+      if (writeResults.isEmpty()) {
+        LOG.info(
+            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, 
since there were no new files to add",
+            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
       } else {
-        boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE, 
false);
-        commitInsert(table, Optional.empty(), startTime, writeResults, 
isOverwrite);
+        commitWrite(table, startTime, writeResults);
       }
+    } else {
+      commitOverwrite(table, startTime, writeResults);
     }
   }
 
-  private void commitDelete(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results) {
-    RowDelta append = 
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
-    List<DeleteFile> deleteFiles = 
results.stream().map(FilesForCommit::deleteFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    deleteFiles.forEach(append::addDeletes);
-    append.commit();
-    LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
-        System.currentTimeMillis() - startTime, table, deleteFiles.size());
-    LOG.debug("Added delete files {}", deleteFiles);
+  private void commitWrite(Table table, long startTime, FilesForCommit 
results) {

Review Comment:
   nit: javadoc



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
     });
 
-    Collection<FilesForCommit> writeResults = collectResults(numTasks, 
executor, location, jobContext, io, true);
-    if (writeResults.isEmpty()) {
-      LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete: 
{}, since there were no new files to add",
-          table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
-    } else {
-      if (HiveIcebergStorageHandler.isDelete(conf, name)) {
-        commitDelete(table, Optional.empty(), startTime, writeResults);
+    FilesForCommit writeResults = collectResults(numTasks, executor, location, 
jobContext, io, true);
+    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+      if (writeResults.isEmpty()) {
+        LOG.info(
+            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, 
since there were no new files to add",
+            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
       } else {
-        boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE, 
false);
-        commitInsert(table, Optional.empty(), startTime, writeResults, 
isOverwrite);
+        commitWrite(table, startTime, writeResults);
       }
+    } else {
+      commitOverwrite(table, startTime, writeResults);
     }
   }
 
-  private void commitDelete(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results) {
-    RowDelta append = 
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
-    List<DeleteFile> deleteFiles = 
results.stream().map(FilesForCommit::deleteFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    deleteFiles.forEach(append::addDeletes);
-    append.commit();
-    LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
-        System.currentTimeMillis() - startTime, table, deleteFiles.size());
-    LOG.debug("Added delete files {}", deleteFiles);
+  private void commitWrite(Table table, long startTime, FilesForCommit 
results) {
+    if (results.deleteFiles().isEmpty()) {
+      AppendFiles write = table.newAppend();
+      results.dataFiles().forEach(write::appendFile);
+      write.commit();
+    } else {
+      RowDelta write = table.newRowDelta();
+      results.dataFiles().forEach(write::addRows);
+      results.deleteFiles().forEach(write::addDeletes);
+      write.commit();
+    }
+
+    LOG.info("Write commit took {} ms for table: {} with {} data and {} delete 
file(s)",
+        System.currentTimeMillis() - startTime, table, 
results.dataFiles().size(), results.deleteFiles().size());
+    LOG.debug("Added files {}", results);
   }
 
-  private void commitInsert(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results, boolean isOverwrite) {
-    List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    if (isOverwrite) {
-      if (!dataFiles.isEmpty()) {
-        ReplacePartitions overwrite = 
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
-        dataFiles.forEach(overwrite::addFile);
-        overwrite.commit();
-        LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
-            table, dataFiles.size());
-      } else if (table.spec().isUnpartitioned()) {
-        DeleteFiles deleteFiles = 
txn.map(Transaction::newDelete).orElse(table.newDelete());
-        deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
-        deleteFiles.commit();
-        LOG.info("Cleared table contents as part of empty overwrite for 
unpartitioned table. " +
-            "Commit took {} ms for table: {}", System.currentTimeMillis() - 
startTime, table);
-      }
-      LOG.debug("Overwrote partitions with files {}", dataFiles);
-    } else if (!dataFiles.isEmpty()) {
-      // Appending data files to the table
-      // We only create a new commit if there's something to append
-      AppendFiles append = 
txn.map(Transaction::newAppend).orElse(table.newAppend());
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Append commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
+  private void commitOverwrite(Table table, long startTime, FilesForCommit 
results) {

Review Comment:
   nit: javadoc



##########
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java:
##########
@@ -3330,6 +3332,23 @@ public static boolean isNonNativeAcidTable(Table table) {
         table.getStorageHandler().supportsAcidOperations() != 
HiveStorageHandler.AcidSupportType.NONE;
   }
 
+  /**
+   * Returns the virtual columns needed for update queries. For ACID queries 
it is a single ROW__ID, for non-native
+   * tables the list is provided by the {@link 
HiveStorageHandler#acidVirtualColumns()}.
+   * @param table The table for which we run the query
+   * @return The list of virtual columns used
+   */
+  public static List<VirtualColumn> getAcidVirtualColumns(Table table) {

Review Comment:
   nit: rename to getVirtualColumns



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java:
##########
@@ -115,20 +115,20 @@ public static PositionDelete<Record> 
getPositionDelete(Record rec, Record rowDat
    * @return The schema for reading files, extended with metadata columns 
needed for deletes
    */
   public static Schema createFileReadSchemaForUpdate(List<Types.NestedField> 
dataCols, Table table) {
-    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
-    SERDE_META_COLS.forEach((metaCol, index) -> {
+    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());

Review Comment:
   nit: Change the name of the class from acid.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -490,17 +491,30 @@ public List<VirtualColumn> acidVirtualColumns() {
   }
 
   @Override
-  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
-    // TODO: make it configurable whether we want to include the table columns 
in the select query
-    // it might make delete writes faster if we don't have to write out the 
row object
-    return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {

Review Comment:
   nit: we could rename this, the other functions and class attributes using 
the `acid` keyword to something more general, like 
`virtualMetadataSelectColumns`. We can do this in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to