kasakrisz commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500474777


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
               LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
             }
           }, IOException.class);
+
+      // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+      // At this stage the file is written and task-committed, but the old 
files are still present.
+      if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {

Review Comment:
   how about
   ```
   
jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
               LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
             }
           }, IOException.class);
+
+      // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+      // At this stage the file is written and task-committed, but the old 
files are still present.
+      if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {
+        MapWork mrwork = Utilities.getMapWork(jobConf);
+        if (mrwork != null) {
+          List<Path> mergedPaths = mrwork.getInputPaths();
+          if (CollectionUtils.isNotEmpty(mergedPaths)) {
+            Tasks.foreach(mergedPaths)

Review Comment:
   Does the empty check necessary? What happens when an empty collections is 
passed to `Tasks.foreach` ?



##########
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##########
@@ -512,7 +555,30 @@ private void setupWorkWhenUsingManifestFile(MapWork 
mapWork, List<FileStatus> fi
     mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map<FileStatus, List<FileStatus>> getManifestDirs(FileSystem inpFs, 
List<FileStatus> fileStatuses)
+  private void setupWorkWhenUsingCustomHandler(MapWork mapWork, Path dirPath,
+                                               StorageHandlerMergeProperties 
mergeProperties) throws ClassNotFoundException {
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork = 
mapWork.getAliasToWork();
+    Map<Path, PartitionDesc> pathToPartitionInfo = 
mapWork.getPathToPartitionInfo();
+    Operator<? extends OperatorDesc> op = aliasToWork.get(dirPath.toString());
+    PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath);
+    Path tmpDir = mergeProperties.getTmpLocation();

Review Comment:
   Can `mergeProperties` be null?
   
   I saw earlier that sometimes we set it to null:
   ```
   StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
               
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+    this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+    String location = properties.getProperty(Catalogs.LOCATION);
+    return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+    FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+    if (fileFormat == FileFormat.ORC) {
+      return OrcInputFormat.class.getName();
+    } else if (fileFormat == FileFormat.PARQUET) {
+      return MapredParquetInputFormat.class.getName();
+    } else if (fileFormat == FileFormat.AVRO) {
+      return AvroContainerInputFormat.class.getName();
+    }
+    return null;
+  }
+
+  public String getOutputFileFormatClassName() {
+    FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+    if (fileFormat == FileFormat.ORC) {
+      return OrcOutputFormat.class.getName();
+    } else if (fileFormat == FileFormat.PARQUET) {
+      return MapredParquetOutputFormat.class.getName();
+    } else if (fileFormat == FileFormat.AVRO) {
+      return AvroContainerOutputFormat.class.getName();
+    }
+    return null;
+  }
+
+  public String getFileSerdeClassName() {
+    FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+    if (fileFormat == FileFormat.ORC) {
+      return OrcSerde.class.getName();
+    } else if (fileFormat == FileFormat.PARQUET) {
+      return ParquetHiveSerDe.class.getName();
+    } else if (fileFormat == FileFormat.AVRO) {
+      return AvroSerDe.class.getName();
+    }
+    return null;
+  }

Review Comment:
   nit
   how about using some constant map and inner class to define this mapping and 
do a type check
   ```
     private static final class HiveFileFormat<
         I extends InputFormat<?, ?>,
         O extends OutputFormat<?,?>,
         S extends AbstractSerDe> {
       private final Class<I> inputFormatClass;
       private final Class<O> outputFormatClass;
       private final Class<S> serDeClass;
   
       public HiveFileFormat(Class<I> inputFormatClass, Class<O> 
outputFormatClass, Class<S> serDeClass) {
         this.inputFormatClass = inputFormatClass;
         this.outputFormatClass = outputFormatClass;
         this.serDeClass = serDeClass;
       }
     }
   
     private static final Map<FileFormat, HiveFileFormat<?,?,?>> fileFormats;
   
     static {
       fileFormats = new HashMap<>(3);
       fileFormats.put(FileFormat.ORC, new 
HiveFileFormat<>(OrcInputFormat.class, OrcOutputFormat.class, OrcSerde.class));
       fileFormats.put(FileFormat.PARQUET, new 
HiveFileFormat<>(MapredParquetInputFormat.class, 
MapredParquetOutputFormat.class, ParquetHiveSerDe.class));
       fileFormats.put(FileFormat.AVRO, new 
HiveFileFormat<>(OrcInputFormat.class, OrcOutputFormat.class, OrcSerde.class));
     }
   
     public String getInputFileFormatClassName() {
       HiveFileFormat<?, ?, ?> hiveFileFormat = 
fileFormats.get(FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT)));
       return hiveFileFormat != null ? 
hiveFileFormat.inputFormatClass.getName() : null;
     }
   ```



##########
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##########
@@ -370,7 +371,8 @@ private InputSplit[] getCombineSplits(JobConf job, int 
numSplits,
       PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
           pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();
-      if ((tableDesc != null) && tableDesc.isNonNative()) {
+      boolean useDefaultFileFormat = 
part.getInputFileFormatClass().equals(tableDesc.getInputFileFormatClass());

Review Comment:
   Can `Class.isAssignableFrom` be used here?



##########
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##########
@@ -527,4 +593,20 @@ private Map<FileStatus, List<FileStatus>> 
getManifestDirs(FileSystem inpFs, List
     }
     return manifestDirsToPaths;
   }
+
+  private void setMergePropertiesToPartDesc(PartitionDesc partitionDesc,
+                                            StorageHandlerMergeProperties 
mergeProperties) throws ClassNotFoundException{
+    String inputFileFormatClassName = 
mergeProperties.getInputFileFormatClassName();
+    String outputFileFormatClassName = 
mergeProperties.getOutputFileFormatClassName();
+    String serdeClassName = mergeProperties.getFileSerdeClassName();

Review Comment:
   Can `mergeProperties` be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to