lcspinter commented on code in PR #3204:
URL: https://github.com/apache/hive/pull/3204#discussion_r866544611
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService
executor, JobContext jobCont
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
});
- Collection<FilesForCommit> writeResults = collectResults(numTasks,
executor, location, jobContext, io, true);
- if (writeResults.isEmpty()) {
- LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
- } else {
- if (HiveIcebergStorageHandler.isDelete(conf, name)) {
- commitDelete(table, Optional.empty(), startTime, writeResults);
+ FilesForCommit writeResults = collectResults(numTasks, executor, location,
jobContext, io, true);
+ if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+ if (writeResults.isEmpty()) {
+ LOG.info(
+ "Not creating a new commit for table: {}, jobID: {}, isDelete: {},
since there were no new files to add",
+ table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
} else {
- boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE,
false);
- commitInsert(table, Optional.empty(), startTime, writeResults,
isOverwrite);
+ commitWrite(table, startTime, writeResults);
}
+ } else {
+ commitOverwrite(table, startTime, writeResults);
}
}
- private void commitDelete(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results) {
- RowDelta append =
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
- List<DeleteFile> deleteFiles =
results.stream().map(FilesForCommit::deleteFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- deleteFiles.forEach(append::addDeletes);
- append.commit();
- LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
- System.currentTimeMillis() - startTime, table, deleteFiles.size());
- LOG.debug("Added delete files {}", deleteFiles);
+ private void commitWrite(Table table, long startTime, FilesForCommit
results) {
Review Comment:
nit: javadoc
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -330,61 +331,54 @@ private void commitTable(FileIO io, ExecutorService
executor, JobContext jobCont
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
});
- Collection<FilesForCommit> writeResults = collectResults(numTasks,
executor, location, jobContext, io, true);
- if (writeResults.isEmpty()) {
- LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
- } else {
- if (HiveIcebergStorageHandler.isDelete(conf, name)) {
- commitDelete(table, Optional.empty(), startTime, writeResults);
+ FilesForCommit writeResults = collectResults(numTasks, executor, location,
jobContext, io, true);
+ if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+ if (writeResults.isEmpty()) {
+ LOG.info(
+ "Not creating a new commit for table: {}, jobID: {}, isDelete: {},
since there were no new files to add",
+ table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
} else {
- boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE,
false);
- commitInsert(table, Optional.empty(), startTime, writeResults,
isOverwrite);
+ commitWrite(table, startTime, writeResults);
}
+ } else {
+ commitOverwrite(table, startTime, writeResults);
}
}
- private void commitDelete(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results) {
- RowDelta append =
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
- List<DeleteFile> deleteFiles =
results.stream().map(FilesForCommit::deleteFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- deleteFiles.forEach(append::addDeletes);
- append.commit();
- LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
- System.currentTimeMillis() - startTime, table, deleteFiles.size());
- LOG.debug("Added delete files {}", deleteFiles);
+ private void commitWrite(Table table, long startTime, FilesForCommit
results) {
+ if (results.deleteFiles().isEmpty()) {
+ AppendFiles write = table.newAppend();
+ results.dataFiles().forEach(write::appendFile);
+ write.commit();
+ } else {
+ RowDelta write = table.newRowDelta();
+ results.dataFiles().forEach(write::addRows);
+ results.deleteFiles().forEach(write::addDeletes);
+ write.commit();
+ }
+
+ LOG.info("Write commit took {} ms for table: {} with {} data and {} delete
file(s)",
+ System.currentTimeMillis() - startTime, table,
results.dataFiles().size(), results.deleteFiles().size());
+ LOG.debug("Added files {}", results);
}
- private void commitInsert(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results, boolean isOverwrite) {
- List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- if (isOverwrite) {
- if (!dataFiles.isEmpty()) {
- ReplacePartitions overwrite =
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
- dataFiles.forEach(overwrite::addFile);
- overwrite.commit();
- LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
- table, dataFiles.size());
- } else if (table.spec().isUnpartitioned()) {
- DeleteFiles deleteFiles =
txn.map(Transaction::newDelete).orElse(table.newDelete());
- deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
- deleteFiles.commit();
- LOG.info("Cleared table contents as part of empty overwrite for
unpartitioned table. " +
- "Commit took {} ms for table: {}", System.currentTimeMillis() -
startTime, table);
- }
- LOG.debug("Overwrote partitions with files {}", dataFiles);
- } else if (!dataFiles.isEmpty()) {
- // Appending data files to the table
- // We only create a new commit if there's something to append
- AppendFiles append =
txn.map(Transaction::newAppend).orElse(table.newAppend());
- dataFiles.forEach(append::appendFile);
- append.commit();
- LOG.info("Append commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime, table,
- dataFiles.size());
- LOG.debug("Added files {}", dataFiles);
+ private void commitOverwrite(Table table, long startTime, FilesForCommit
results) {
Review Comment:
nit: javadoc
##########
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java:
##########
@@ -3330,6 +3332,23 @@ public static boolean isNonNativeAcidTable(Table table) {
table.getStorageHandler().supportsAcidOperations() !=
HiveStorageHandler.AcidSupportType.NONE;
}
+ /**
+ * Returns the virtual columns needed for update queries. For ACID queries
it is a single ROW__ID, for non-native
+ * tables the list is provided by the {@link
HiveStorageHandler#acidVirtualColumns()}.
+ * @param table The table for which we run the query
+ * @return The list of virtual columns used
+ */
+ public static List<VirtualColumn> getAcidVirtualColumns(Table table) {
Review Comment:
nit: rename to getVirtualColumns
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java:
##########
@@ -115,20 +115,20 @@ public static PositionDelete<Record>
getPositionDelete(Record rec, Record rowDat
* @return The schema for reading files, extended with metadata columns
needed for deletes
*/
public static Schema createFileReadSchemaForUpdate(List<Types.NestedField>
dataCols, Table table) {
- List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
- SERDE_META_COLS.forEach((metaCol, index) -> {
+ List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
Review Comment:
nit: Change the name of the class from acid.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -490,17 +491,30 @@ public List<VirtualColumn> acidVirtualColumns() {
}
@Override
- public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
- // TODO: make it configurable whether we want to include the table columns
in the select query
- // it might make delete writes faster if we don't have to write out the
row object
- return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
Review Comment:
nit: we could rename this, the other functions and class attributes using
the `acid` keyword to something more general, like
`virtualMetadataSelectColumns`. We can do this in a separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]