[
https://issues.apache.org/jira/browse/HIVE-26136?focusedWorklogId=767056&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767056
]
ASF GitHub Bot logged work on HIVE-26136:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/22 07:20
Start Date: 06/May/22 07:20
Worklog Time Spent: 10m
Work Description: lcspinter commented on code in PR #3204:
URL: https://github.com/apache/hive/pull/3204#discussion_r866544611
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService
executor, JobContext jobCont
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
});
- Collection<FilesForCommit> writeResults = collectResults(numTasks,
executor, location, jobContext, io, true);
- if (writeResults.isEmpty()) {
- LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
- } else {
- if (HiveIcebergStorageHandler.isDelete(conf, name)) {
- commitDelete(table, Optional.empty(), startTime, writeResults);
+ FilesForCommit writeResults = collectResults(numTasks, executor, location,
jobContext, io, true);
+ if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+ if (writeResults.isEmpty()) {
+ LOG.info(
+ "Not creating a new commit for table: {}, jobID: {}, isDelete: {},
since there were no new files to add",
+ table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
} else {
- boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE,
false);
- commitInsert(table, Optional.empty(), startTime, writeResults,
isOverwrite);
+ commitWrite(table, startTime, writeResults);
}
+ } else {
+ commitOverwrite(table, startTime, writeResults);
}
}
- private void commitDelete(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results) {
- RowDelta append =
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
- List<DeleteFile> deleteFiles =
results.stream().map(FilesForCommit::deleteFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- deleteFiles.forEach(append::addDeletes);
- append.commit();
- LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
- System.currentTimeMillis() - startTime, table, deleteFiles.size());
- LOG.debug("Added delete files {}", deleteFiles);
+ private void commitWrite(Table table, long startTime, FilesForCommit
results) {
Review Comment:
nit: javadoc
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService
executor, JobContext jobCont
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
});
- Collection<FilesForCommit> writeResults = collectResults(numTasks,
executor, location, jobContext, io, true);
- if (writeResults.isEmpty()) {
- LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
- } else {
- if (HiveIcebergStorageHandler.isDelete(conf, name)) {
- commitDelete(table, Optional.empty(), startTime, writeResults);
+ FilesForCommit writeResults = collectResults(numTasks, executor, location,
jobContext, io, true);
+ if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+ if (writeResults.isEmpty()) {
+ LOG.info(
+ "Not creating a new commit for table: {}, jobID: {}, isDelete: {},
since there were no new files to add",
+ table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
} else {
- boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE,
false);
- commitInsert(table, Optional.empty(), startTime, writeResults,
isOverwrite);
+ commitWrite(table, startTime, writeResults);
}
+ } else {
+ commitOverwrite(table, startTime, writeResults);
}
}
- private void commitDelete(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results) {
- RowDelta append =
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
- List<DeleteFile> deleteFiles =
results.stream().map(FilesForCommit::deleteFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- deleteFiles.forEach(append::addDeletes);
- append.commit();
- LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
- System.currentTimeMillis() - startTime, table, deleteFiles.size());
- LOG.debug("Added delete files {}", deleteFiles);
+ private void commitWrite(Table table, long startTime, FilesForCommit
results) {
+ if (results.deleteFiles().isEmpty()) {
+ AppendFiles write = table.newAppend();
+ results.dataFiles().forEach(write::appendFile);
+ write.commit();
+ } else {
+ RowDelta write = table.newRowDelta();
+ results.dataFiles().forEach(write::addRows);
+ results.deleteFiles().forEach(write::addDeletes);
+ write.commit();
+ }
+
+ LOG.info("Write commit took {} ms for table: {} with {} data and {} delete
file(s)",
+ System.currentTimeMillis() - startTime, table,
results.dataFiles().size(), results.deleteFiles().size());
+ LOG.debug("Added files {}", results);
}
- private void commitInsert(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results, boolean isOverwrite) {
- List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- if (isOverwrite) {
- if (!dataFiles.isEmpty()) {
- ReplacePartitions overwrite =
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
- dataFiles.forEach(overwrite::addFile);
- overwrite.commit();
- LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
- table, dataFiles.size());
- } else if (table.spec().isUnpartitioned()) {
- DeleteFiles deleteFiles =
txn.map(Transaction::newDelete).orElse(table.newDelete());
- deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
- deleteFiles.commit();
- LOG.info("Cleared table contents as part of empty overwrite for
unpartitioned table. " +
- "Commit took {} ms for table: {}", System.currentTimeMillis() -
startTime, table);
- }
- LOG.debug("Overwrote partitions with files {}", dataFiles);
- } else if (!dataFiles.isEmpty()) {
- // Appending data files to the table
- // We only create a new commit if there's something to append
- AppendFiles append =
txn.map(Transaction::newAppend).orElse(table.newAppend());
- dataFiles.forEach(append::appendFile);
- append.commit();
- LOG.info("Append commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime, table,
- dataFiles.size());
- LOG.debug("Added files {}", dataFiles);
+ private void commitOverwrite(Table table, long startTime, FilesForCommit
results) {
Review Comment:
nit: javadoc
##########
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java:
##########
@@ -3330,6 +3332,23 @@ public static boolean isNonNativeAcidTable(Table table) {
table.getStorageHandler().supportsAcidOperations() !=
HiveStorageHandler.AcidSupportType.NONE;
}
+ /**
+ * Returns the virtual columns needed for update queries. For ACID queries
it is a single ROW__ID, for non-native
+ * tables the list is provided by the {@link
HiveStorageHandler#acidVirtualColumns()}.
+ * @param table The table for which we run the query
+ * @return The list of virtual columns used
+ */
+ public static List<VirtualColumn> getAcidVirtualColumns(Table table) {
Review Comment:
nit: rename to getVirtualColumns
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java:
##########
@@ -115,20 +115,20 @@ public static PositionDelete<Record>
getPositionDelete(Record rec, Record rowDat
* @return The schema for reading files, extended with metadata columns
needed for deletes
*/
public static Schema createFileReadSchemaForUpdate(List<Types.NestedField>
dataCols, Table table) {
- List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
- SERDE_META_COLS.forEach((metaCol, index) -> {
+ List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
Review Comment:
nit: Change the name of the class from acid.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -490,17 +491,30 @@ public List<VirtualColumn> acidVirtualColumns() {
}
@Override
- public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
- // TODO: make it configurable whether we want to include the table columns
in the select query
- // it might make delete writes faster if we don't have to write out the
row object
- return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
Review Comment:
nit: we could rename this, the other functions and class attributes using
the `acid` keyword to something more general, like
`virtualMetadataSelectColumns`. We can do this in a separate PR.
Issue Time Tracking
-------------------
Worklog Id: (was: 767056)
Time Spent: 1h (was: 50m)
> Implement UPDATE statements for Iceberg tables
> ----------------------------------------------
>
> Key: HIVE-26136
> URL: https://issues.apache.org/jira/browse/HIVE-26136
> Project: Hive
> Issue Type: Task
> Reporter: Peter Vary
> Assignee: Peter Vary
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)