bvaradar commented on a change in pull request #1687:
URL: https://github.com/apache/hudi/pull/1687#discussion_r444459286



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
##########
@@ -51,7 +49,6 @@
   private final long maxFileSize;
   private final HoodieAvroWriteSupport writeSupport;
   private final String instantTime;
-  private final Schema schema;

Review comment:
       thanks for cleaning this up

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -180,4 +183,10 @@ protected int getStageId() {
   protected long getAttemptId() {
     return sparkTaskContextSupplier.getAttemptIdSupplier().get();
   }
+
+  protected HoodieFileWriter getNewFileWriter(String instantTime, Path path, 
HoodieTable<T> hoodieTable,
+                                              HoodieWriteConfig config, Schema 
schema,
+                                              SparkTaskContextSupplier 
sparkTaskContextSupplier) throws IOException {
+    return HoodieFileWriterFactory.getFileWriter(instantTime, path, 
hoodieTable, config, schema, sparkTaskContextSupplier);

Review comment:
       Similar suggestion on rename getFileWriter => createFileWriter

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -80,58 +77,6 @@ protected HoodieDefaultTimeline 
filterInstantsTimeline(HoodieDefaultTimeline tim
     return timeline;
   }
 
-  /**

Review comment:
       Can we avoid this cleanup as this is not directly related to this PR and 
I am also making changes on these same methods.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
##########
@@ -150,11 +148,13 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc, 
String compactionInstan
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(getHadoopConf(), 
upsertHandle.getWriterSchema());

Review comment:
       nvm, found it in getRecordIterator

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -34,29 +34,28 @@
 
 import java.io.IOException;
 
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
 
-  public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieStorageWriter<R> getStorageWriter(
+  public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> getFileWriter(

Review comment:
       @nsivabalan : Yes, this is what I was referring to but it is not 
specific to Storage alone. Other parts like the merge algorithm that needs to 
be configured per storage-type is also created here. 
   
   @prashantwason : Let's revisit this in subsequent PR where we parameterize 
merge algorithm based on storage types. It is ok to keep reader and writer 
factory separate.

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
##########
@@ -56,4 +61,9 @@ protected HoodieBaseFile getLatestDataFile() {
     return hoodieTable.getBaseFileOnlyView()
         .getLatestBaseFile(partitionPathFilePair.getLeft(), 
partitionPathFilePair.getRight()).get();
   }
+
+  protected HoodieFileReader getNewFileReader() throws IOException {
+    return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),

Review comment:
       Minor : Rename getFileReader => createFileReader ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
##########
@@ -150,11 +148,13 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc, 
String compactionInstan
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(getHadoopConf(), 
upsertHandle.getWriterSchema());

Review comment:
       Is this done some where else now ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
##########
@@ -33,4 +37,12 @@
   void close() throws IOException;
 
   void writeAvro(String key, R oldRecord) throws IOException;
+
+  static Configuration registerFileSystem(Path file, Configuration conf) {

Review comment:
       Can we move this to FSUtils class ?

##########
File path: hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
##########
@@ -146,21 +146,22 @@ private void syncSchema(String tableName, boolean 
tableExists, boolean useRealTi
     // Check and sync schema
     if (!tableExists) {
       LOG.info("Hive table " + tableName + " is not found. Creating it");
-      if (!useRealTimeInputFormat) {
-        String inputFormatClassName = cfg.usePreApacheInputFormat ? 
com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
-            : HoodieParquetInputFormat.class.getName();
-        hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, 
MapredParquetOutputFormat.class.getName(),
-            ParquetHiveSerDe.class.getName());
-      } else {
-        // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
-        // 
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
-        // /ql/exec/DDLTask.java#L3488
-        String inputFormatClassName =
-            cfg.usePreApacheInputFormat ? 
com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
-                : HoodieParquetRealtimeInputFormat.class.getName();
-        hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, 
MapredParquetOutputFormat.class.getName(),
-            ParquetHiveSerDe.class.getName());
+      HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(cfg.baseFileFormat.toUpperCase());
+      String inputFormatClassName = 
HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, 
useRealTimeInputFormat,
+          new Configuration());
+
+      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && 
cfg.usePreApacheInputFormat) {
+        // Parquet input format had an InputFormat class visible under the old 
naming scheme.
+        inputFormatClassName = useRealTimeInputFormat
+            ? 
com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
+            : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName();
       }
+
+      // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+      // 
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+      // /ql/exec/DDLTask.java#L3488
+      hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, 
MapredParquetOutputFormat.class.getName(),

Review comment:
       The output format (MapredParquetOutputFormat  in case of parquet) would 
have to change depending on storage type here. right ? We need to parameterize 
that as well 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -356,20 +356,7 @@ public MessageType 
readSchemaFromLastCompaction(Option<HoodieInstant> lastCompac
    * @return
    */
   public MessageType readSchemaFromLogFile(Path path) throws IOException {
-    FileSystem fs = metaClient.getRawFs();
-    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
-    HoodieAvroDataBlock lastBlock = null;
-    while (reader.hasNext()) {
-      HoodieLogBlock block = reader.next();
-      if (block instanceof HoodieAvroDataBlock) {
-        lastBlock = (HoodieAvroDataBlock) block;
-      }
-    }
-    reader.close();
-    if (lastBlock != null) {
-      return new AvroSchemaConverter().convert(lastBlock.getSchema());
-    }
-    return null;
+    return readSchemaFromLogFile(metaClient.getRawFs(), path);

Review comment:
       thanks for cleaning this up.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.functional;
+
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+
+/**
+ * Tests HFile log format {@link HoodieHFileLogFormat}.

Review comment:
       HFile => Avro

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
##########
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;

Review comment:
       Can we keep this in hudi-client ? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -177,6 +177,9 @@ public Operation convert(String value) throws 
ParameterException {
     @Parameter(names = {"--table-type"}, description = "Type of table. 
COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
     public String tableType;
 
+    @Parameter(names = {"--base-file-format"}, description = "File format for 
the base files. PARQUET (or) HFILE", required = false)

Review comment:
       Similar functionality needs to be done for Spark SQL Writer. See 
HoodieSparkSqlWriter.scala




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to