[ 
https://issues.apache.org/jira/browse/HIVE-26136?focusedWorklogId=767056&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767056
 ]

ASF GitHub Bot logged work on HIVE-26136:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/22 07:20
            Start Date: 06/May/22 07:20
    Worklog Time Spent: 10m 
      Work Description: lcspinter commented on code in PR #3204:
URL: https://github.com/apache/hive/pull/3204#discussion_r866544611


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
     });
 
-    Collection<FilesForCommit> writeResults = collectResults(numTasks, 
executor, location, jobContext, io, true);
-    if (writeResults.isEmpty()) {
-      LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete: 
{}, since there were no new files to add",
-          table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
-    } else {
-      if (HiveIcebergStorageHandler.isDelete(conf, name)) {
-        commitDelete(table, Optional.empty(), startTime, writeResults);
+    FilesForCommit writeResults = collectResults(numTasks, executor, location, 
jobContext, io, true);
+    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+      if (writeResults.isEmpty()) {
+        LOG.info(
+            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, 
since there were no new files to add",
+            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
       } else {
-        boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE, 
false);
-        commitInsert(table, Optional.empty(), startTime, writeResults, 
isOverwrite);
+        commitWrite(table, startTime, writeResults);
       }
+    } else {
+      commitOverwrite(table, startTime, writeResults);
     }
   }
 
-  private void commitDelete(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results) {
-    RowDelta append = 
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
-    List<DeleteFile> deleteFiles = 
results.stream().map(FilesForCommit::deleteFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    deleteFiles.forEach(append::addDeletes);
-    append.commit();
-    LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
-        System.currentTimeMillis() - startTime, table, deleteFiles.size());
-    LOG.debug("Added delete files {}", deleteFiles);
+  private void commitWrite(Table table, long startTime, FilesForCommit 
results) {

Review Comment:
   nit: javadoc



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
     });
 
-    Collection<FilesForCommit> writeResults = collectResults(numTasks, 
executor, location, jobContext, io, true);
-    if (writeResults.isEmpty()) {
-      LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete: 
{}, since there were no new files to add",
-          table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
-    } else {
-      if (HiveIcebergStorageHandler.isDelete(conf, name)) {
-        commitDelete(table, Optional.empty(), startTime, writeResults);
+    FilesForCommit writeResults = collectResults(numTasks, executor, location, 
jobContext, io, true);
+    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+      if (writeResults.isEmpty()) {
+        LOG.info(
+            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, 
since there were no new files to add",
+            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
       } else {
-        boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE, 
false);
-        commitInsert(table, Optional.empty(), startTime, writeResults, 
isOverwrite);
+        commitWrite(table, startTime, writeResults);
       }
+    } else {
+      commitOverwrite(table, startTime, writeResults);
     }
   }
 
-  private void commitDelete(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results) {
-    RowDelta append = 
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
-    List<DeleteFile> deleteFiles = 
results.stream().map(FilesForCommit::deleteFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    deleteFiles.forEach(append::addDeletes);
-    append.commit();
-    LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
-        System.currentTimeMillis() - startTime, table, deleteFiles.size());
-    LOG.debug("Added delete files {}", deleteFiles);
+  private void commitWrite(Table table, long startTime, FilesForCommit 
results) {
+    if (results.deleteFiles().isEmpty()) {
+      AppendFiles write = table.newAppend();
+      results.dataFiles().forEach(write::appendFile);
+      write.commit();
+    } else {
+      RowDelta write = table.newRowDelta();
+      results.dataFiles().forEach(write::addRows);
+      results.deleteFiles().forEach(write::addDeletes);
+      write.commit();
+    }
+
+    LOG.info("Write commit took {} ms for table: {} with {} data and {} delete 
file(s)",
+        System.currentTimeMillis() - startTime, table, 
results.dataFiles().size(), results.deleteFiles().size());
+    LOG.debug("Added files {}", results);
   }
 
-  private void commitInsert(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results, boolean isOverwrite) {
-    List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    if (isOverwrite) {
-      if (!dataFiles.isEmpty()) {
-        ReplacePartitions overwrite = 
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
-        dataFiles.forEach(overwrite::addFile);
-        overwrite.commit();
-        LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
-            table, dataFiles.size());
-      } else if (table.spec().isUnpartitioned()) {
-        DeleteFiles deleteFiles = 
txn.map(Transaction::newDelete).orElse(table.newDelete());
-        deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
-        deleteFiles.commit();
-        LOG.info("Cleared table contents as part of empty overwrite for 
unpartitioned table. " +
-            "Commit took {} ms for table: {}", System.currentTimeMillis() - 
startTime, table);
-      }
-      LOG.debug("Overwrote partitions with files {}", dataFiles);
-    } else if (!dataFiles.isEmpty()) {
-      // Appending data files to the table
-      // We only create a new commit if there's something to append
-      AppendFiles append = 
txn.map(Transaction::newAppend).orElse(table.newAppend());
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Append commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
+  private void commitOverwrite(Table table, long startTime, FilesForCommit 
results) {

Review Comment:
   nit: javadoc



##########
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java:
##########
@@ -3330,6 +3332,23 @@ public static boolean isNonNativeAcidTable(Table table) {
         table.getStorageHandler().supportsAcidOperations() != 
HiveStorageHandler.AcidSupportType.NONE;
   }
 
+  /**
+   * Returns the virtual columns needed for update queries. For ACID queries 
it is a single ROW__ID, for non-native
+   * tables the list is provided by the {@link 
HiveStorageHandler#acidVirtualColumns()}.
+   * @param table The table for which we run the query
+   * @return The list of virtual columns used
+   */
+  public static List<VirtualColumn> getAcidVirtualColumns(Table table) {

Review Comment:
   nit: rename to getVirtualColumns



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java:
##########
@@ -115,20 +115,20 @@ public static PositionDelete<Record> 
getPositionDelete(Record rec, Record rowDat
    * @return The schema for reading files, extended with metadata columns 
needed for deletes
    */
   public static Schema createFileReadSchemaForUpdate(List<Types.NestedField> 
dataCols, Table table) {
-    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
-    SERDE_META_COLS.forEach((metaCol, index) -> {
+    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());

Review Comment:
   nit: Change the name of the class from acid.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -490,17 +491,30 @@ public List<VirtualColumn> acidVirtualColumns() {
   }
 
   @Override
-  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
-    // TODO: make it configurable whether we want to include the table columns 
in the select query
-    // it might make delete writes faster if we don't have to write out the 
row object
-    return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {

Review Comment:
   nit: we could rename this, the other functions and class attributes using 
the `acid` keyword to something more general, like 
`virtualMetadataSelectColumns`. We can do this in a separate PR.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 767056)
    Time Spent: 1h  (was: 50m)

> Implement UPDATE statements for Iceberg tables
> ----------------------------------------------
>
>                 Key: HIVE-26136
>                 URL: https://issues.apache.org/jira/browse/HIVE-26136
>             Project: Hive
>          Issue Type: Task
>            Reporter: Peter Vary
>            Assignee: Peter Vary
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to