lcspinter commented on code in PR #3269:
URL: https://github.com/apache/hive/pull/3269#discussion_r869294281
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java:
##########
@@ -63,39 +60,19 @@ public void checkOutputSpecs(FileSystem ignored, JobConf
job) {
// Not doing any check.
}
- private static HiveIcebergWriterBase writer(JobConf jc) {
+ private static HiveIcebergWriter writer(JobConf jc) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
// It gets the config from the FileSinkOperator which has its own config
for every target table
Table table = HiveIcebergStorageHandler.table(jc,
jc.get(hive_metastoreConstants.META_TABLE_NAME));
- Schema schema = HiveIcebergStorageHandler.schema(jc);
- FileFormat fileFormat =
FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
- TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
- long targetFileSize = PropertyUtil.propertyAsLong(table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
- TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- FileIO io = table.io();
- int partitionId = taskAttemptID.getTaskID().getId();
- int taskId = taskAttemptID.getId();
- String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" +
taskAttemptID.getJobID();
- OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId)
- .format(fileFormat)
- .operationId(operationId)
- .build();
String tableName = jc.get(Catalogs.NAME);
- if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat,
writerFactory, outputFileFactory, io,
- targetFileSize, taskAttemptID, tableName);
- } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, null);
- return new HiveIcebergUpdateWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
- } else {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergRecordWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName,
false);
- }
+ int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE,
DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+ return WriterBuilder.builderFor(table)
+ .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+ .tableName(tableName)
Review Comment:
Sorry, my mistake. I confused with table.name() which returns
`catalogName.dbName.tblName`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]