pvary commented on code in PR #3269:
URL: https://github.com/apache/hive/pull/3269#discussion_r869259860


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java:
##########
@@ -63,39 +60,19 @@ public void checkOutputSpecs(FileSystem ignored, JobConf 
job) {
     // Not doing any check.
   }
 
-  private static HiveIcebergWriterBase writer(JobConf jc) {
+  private static HiveIcebergWriter writer(JobConf jc) {
     TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
     // It gets the config from the FileSinkOperator which has its own config 
for every target table
     Table table = HiveIcebergStorageHandler.table(jc, 
jc.get(hive_metastoreConstants.META_TABLE_NAME));
-    Schema schema = HiveIcebergStorageHandler.schema(jc);
-    FileFormat fileFormat = 
FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
-        TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
-    long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    FileIO io = table.io();
-    int partitionId = taskAttemptID.getTaskID().getId();
-    int taskId = taskAttemptID.getId();
-    String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + 
taskAttemptID.getJobID();
-    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId)
-        .format(fileFormat)
-        .operationId(operationId)
-        .build();
     String tableName = jc.get(Catalogs.NAME);
-    if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, 
writerFactory, outputFileFactory, io,
-          targetFileSize, taskAttemptID, tableName);
-    } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, null);
-      return new HiveIcebergUpdateWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
-    } else {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergRecordWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, 
false);
-    }
+    int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, 
DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+    return WriterBuilder.builderFor(table)
+        .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+        .tableName(tableName)

Review Comment:
   AFAIK there is no tableName for the table object.
   Do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to