[
https://issues.apache.org/jira/browse/HIVE-26202?focusedWorklogId=768510&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768510
]
ASF GitHub Bot logged work on HIVE-26202:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/May/22 13:46
Start Date: 10/May/22 13:46
Worklog Time Spent: 10m
Work Description: pvary commented on code in PR #3269:
URL: https://github.com/apache/hive/pull/3269#discussion_r869259860
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java:
##########
@@ -63,39 +60,19 @@ public void checkOutputSpecs(FileSystem ignored, JobConf
job) {
// Not doing any check.
}
- private static HiveIcebergWriterBase writer(JobConf jc) {
+ private static HiveIcebergWriter writer(JobConf jc) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
// It gets the config from the FileSinkOperator which has its own config
for every target table
Table table = HiveIcebergStorageHandler.table(jc,
jc.get(hive_metastoreConstants.META_TABLE_NAME));
- Schema schema = HiveIcebergStorageHandler.schema(jc);
- FileFormat fileFormat =
FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
- TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
- long targetFileSize = PropertyUtil.propertyAsLong(table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
- TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- FileIO io = table.io();
- int partitionId = taskAttemptID.getTaskID().getId();
- int taskId = taskAttemptID.getId();
- String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" +
taskAttemptID.getJobID();
- OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId)
- .format(fileFormat)
- .operationId(operationId)
- .build();
String tableName = jc.get(Catalogs.NAME);
- if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat,
writerFactory, outputFileFactory, io,
- targetFileSize, taskAttemptID, tableName);
- } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, null);
- return new HiveIcebergUpdateWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
- } else {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergRecordWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName,
false);
- }
+ int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE,
DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+ return WriterBuilder.builderFor(table)
+ .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+ .tableName(tableName)
Review Comment:
AFAIK there is no tableName for the table object.
Do I miss something?
Issue Time Tracking
-------------------
Worklog Id: (was: 768510)
Time Spent: 40m (was: 0.5h)
> Refactor Iceberg Writers
> ------------------------
>
> Key: HIVE-26202
> URL: https://issues.apache.org/jira/browse/HIVE-26202
> Project: Hive
> Issue Type: Improvement
> Reporter: Peter Vary
> Priority: Major
> Labels: pull-request-available
> Time Spent: 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)