[ 
https://issues.apache.org/jira/browse/HUDI-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381629#comment-17381629
 ] 

ASF GitHub Bot commented on HUDI-2161:
--------------------------------------

vinothchandar commented on a change in pull request #3247:
URL: https://github.com/apache/hudi/pull/3247#discussion_r670175121



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+/**
+ * RowCreateHandle to be used when meta columns are disabled.
+ */
+public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle {

Review comment:
       rename: `HoodieNoMetaRowCreateHandle` or sth? lets not leak higher level 
use-cases into class names low down the stack? Row create handle implies we are 
appending data?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+/**
+ * RowCreateHandle to be used when meta columns are disabled.
+ */
+public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle {
+
+  public HoodieAppendOnlyRowCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId, String instantTime,
+                                         int taskPartitionId, long taskId, 
long taskEpochId, StructType structType) {
+    super(table, writeConfig, partitionPath, fileId, instantTime, 
taskPartitionId, taskId, taskEpochId, structType);
+  }
+
+  /**
+   * Write the incoming InternalRow as is.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  @Override
+  public void write(InternalRow record) throws IOException {
+    try {
+      fileWriter.writeRow("", record);

Review comment:
       what are all the empty string? can we avoid such calls?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieAppendOnlyInternalRowParquetWriter.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+import java.io.IOException;
+
+public class HoodieAppendOnlyInternalRowParquetWriter extends 
HoodieInternalRowParquetWriter {

Review comment:
       rename this too?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -116,5 +185,24 @@ void buildFieldPositionMapIfNeeded(StructType structType) {
       this.structType = structType;
     }
   }
+
+  synchronized void buildFieldDataTypesMapIfNeeded(StructType structType) {

Review comment:
       why do we synchronize? any static state that needs to be protected? 
Would be good to avoid if possible

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -675,6 +676,11 @@ public PropertyBuilder setBootstrapBasePath(String 
bootstrapBasePath) {
       return this;
     }
 
+    public PropertyBuilder setPopulateMetaColumns(boolean populateMetaColumns) 
{

Review comment:
       we call the constants - metaFields?  So should we standardize on `field` 
over `column` everywher?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -128,14 +128,35 @@ object HoodieSparkSqlWriter {
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, 
null))
           .setPartitionColumns(partitionColumns)
+          
.setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(),
 HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
+      } else {
+        // validate table properties
+        val tableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build()
+        if (!tableMetaClient.getTableConfig.populateMetaColumns() &&
+          
parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) {
+          throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " 
already disabled for the table. Can't be enabled back. ")

Review comment:
       cant be re-enabled.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java
##########
@@ -27,24 +27,24 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.types.StructType;
 
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-
 import java.io.IOException;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
 /**
  * Factory to assist in instantiating a new {@link 
HoodieInternalRowFileWriter}.
  */
 public class HoodieInternalRowFileWriterFactory {
 
   /**
    * Factory method to assist in instantiating an instance of {@link 
HoodieInternalRowFileWriter}.
-   * @param path path of the RowFileWriter.
+   *
+   * @param path        path of the RowFileWriter.
    * @param hoodieTable instance of {@link HoodieTable} in use.

Review comment:
       can we please avoid all the whitespace changes. It makes review actually 
pretty hard.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+/**
+ * RowCreateHandle to be used when meta columns are disabled.
+ */
+public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle {
+
+  public HoodieAppendOnlyRowCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId, String instantTime,
+                                         int taskPartitionId, long taskId, 
long taskEpochId, StructType structType) {
+    super(table, writeConfig, partitionPath, fileId, instantTime, 
taskPartitionId, taskId, taskEpochId, structType);
+  }
+
+  /**
+   * Write the incoming InternalRow as is.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  @Override
+  public void write(InternalRow record) throws IOException {
+    try {
+      fileWriter.writeRow("", record);
+      writeStatus.markSuccess("");
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;

Review comment:
       wrap into a HoodieException?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -128,14 +128,35 @@ object HoodieSparkSqlWriter {
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, 
null))
           .setPartitionColumns(partitionColumns)
+          
.setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(),
 HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
+      } else {
+        // validate table properties
+        val tableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build()

Review comment:
       In any case, please pull all this validation into its own method.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java
##########
@@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter 
newParquetInternalRowFileWriter(
       Path path, HoodieWriteConfig writeConfig, StructType structType, 
HoodieTable table)
       throws IOException {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());
+        writeConfig.getBloomFilterNumEntries(),
+        writeConfig.getBloomFilterFPP(),
+        writeConfig.getDynamicBloomFilterMaxNumEntries(),
+        writeConfig.getBloomFilterType());
     HoodieRowParquetWriteSupport writeSupport =
-            new HoodieRowParquetWriteSupport(table.getHadoopConf(), 
structType, filter);
+        new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, 
filter);
     return new HoodieInternalRowParquetWriter(
         path, new HoodieRowParquetConfig(
-            writeSupport,
-            writeConfig.getParquetCompressionCodec(),
-            writeConfig.getParquetBlockSize(),
-            writeConfig.getParquetPageSize(),
-            writeConfig.getParquetMaxFileSize(),
-            writeSupport.getHadoopConf(),
-            writeConfig.getParquetCompressionRatio()));
+        writeSupport,
+        writeConfig.getParquetCompressionCodec(),
+        writeConfig.getParquetBlockSize(),
+        writeConfig.getParquetPageSize(),
+        writeConfig.getParquetMaxFileSize(),
+        writeSupport.getHadoopConf(),
+        writeConfig.getParquetCompressionRatio()));
+  }
+
+  public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter(
+      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType 
schema)
+      throws IOException {
+    final String extension = FSUtils.getFileExtension(path.getName());
+    if (PARQUET.getFileExtension().equals(extension)) {
+      return newParquetInternalRowAppendOnlyFileWriter(path, config, schema, 
hoodieTable);
+    }
+    throw new UnsupportedOperationException(extension + " format not supported 
yet.");

Review comment:
       Always throw Hoodie specific versions of the exceptions please

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -18,48 +18,71 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionHelper;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.package$;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
-import scala.Function1;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 /**
  * Base class for the built-in key generators. Contains methods structured for
  * code reuse amongst them.
  */
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
+  private static final Logger LOG = 
LogManager.getLogger(BuiltinKeyGenerator.class);
+
   private static final String STRUCT_NAME = "hoodieRowTopLevelField";
   private static final String NAMESPACE = "hoodieRow";
   private transient Function1<Object, Object> converterFn = null;
   protected StructType structType;
+  protected ExpressionEncoder encoder;
 
   protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
   protected Map<String, List<Integer>> partitionPathPositions = new 
HashMap<>();
+  protected Map<String, List<DataType>> partitionPathDataTypes = null;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
   /**
    * Fetch record key from {@link Row}.
+   *
    * @param row instance of {@link Row} from which record key is requested.
    * @return the record key of interest from {@link Row}.
    */
   @Override
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
     if (null == converterFn) {
+      LOG.warn("Instantiating row converter fn 11 ");

Review comment:
       remove all debug logging like this?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java
##########
@@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter 
newParquetInternalRowFileWriter(
       Path path, HoodieWriteConfig writeConfig, StructType structType, 
HoodieTable table)
       throws IOException {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());
+        writeConfig.getBloomFilterNumEntries(),
+        writeConfig.getBloomFilterFPP(),
+        writeConfig.getDynamicBloomFilterMaxNumEntries(),
+        writeConfig.getBloomFilterType());
     HoodieRowParquetWriteSupport writeSupport =
-            new HoodieRowParquetWriteSupport(table.getHadoopConf(), 
structType, filter);
+        new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, 
filter);
     return new HoodieInternalRowParquetWriter(
         path, new HoodieRowParquetConfig(
-            writeSupport,
-            writeConfig.getParquetCompressionCodec(),
-            writeConfig.getParquetBlockSize(),
-            writeConfig.getParquetPageSize(),
-            writeConfig.getParquetMaxFileSize(),
-            writeSupport.getHadoopConf(),
-            writeConfig.getParquetCompressionRatio()));
+        writeSupport,
+        writeConfig.getParquetCompressionCodec(),
+        writeConfig.getParquetBlockSize(),
+        writeConfig.getParquetPageSize(),
+        writeConfig.getParquetMaxFileSize(),
+        writeSupport.getHadoopConf(),
+        writeConfig.getParquetCompressionRatio()));
+  }
+
+  public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter(
+      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType 
schema)
+      throws IOException {
+    final String extension = FSUtils.getFileExtension(path.getName());

Review comment:
       can we check this based on base file format of the `hoodieTable` that's 
better than relying on the file extension

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieAppendOnlyRowParquetWriteSupport.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieAppendOnlyRowParquetWriteSupport extends 
HoodieRowParquetWriteSupport {

Review comment:
       rename?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;

Review comment:
       move this to org.apache.hudi.io.row?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -81,7 +105,52 @@ public String getPartitionPath(Row row) {
     return getKey(genericRecord).getPartitionPath();
   }
 
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  /**
+   * Fetch partition path from {@link InternalRow}.
+   *
+   * @param internalRow {@link InternalRow} instance from which partition path 
needs to be fetched from.
+   * @param structType  schema of the internalRow.
+   * @return the partition path.
+   */
+  public String getPartitionPath(InternalRow internalRow, StructType 
structType) {
+    try {
+      Row row = deserializeRow(getEncoder(structType), internalRow);
+      return getPartitionPath(row);
+    } catch (Exception e) {
+      throw new HoodieIOException("Conversion of InternalRow to Row failed 
with exception " + e);
+    }
+  }
+
+  private ExpressionEncoder getEncoder(StructType structType) {
+    if (encoder == null) {
+      encoder = getRowEncoder(structType);
+    }
+    return encoder;
+  }
+
+  private static ExpressionEncoder getRowEncoder(StructType schema) {

Review comment:
       rename to `void initEncoder()` ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -81,7 +105,52 @@ public String getPartitionPath(Row row) {
     return getKey(genericRecord).getPartitionPath();
   }
 
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  /**
+   * Fetch partition path from {@link InternalRow}.
+   *
+   * @param internalRow {@link InternalRow} instance from which partition path 
needs to be fetched from.
+   * @param structType  schema of the internalRow.
+   * @return the partition path.
+   */
+  public String getPartitionPath(InternalRow internalRow, StructType 
structType) {
+    try {
+      Row row = deserializeRow(getEncoder(structType), internalRow);
+      return getPartitionPath(row);
+    } catch (Exception e) {
+      throw new HoodieIOException("Conversion of InternalRow to Row failed 
with exception " + e);
+    }
+  }
+
+  private ExpressionEncoder getEncoder(StructType structType) {
+    if (encoder == null) {
+      encoder = getRowEncoder(structType);
+    }
+    return encoder;
+  }
+
+  private static ExpressionEncoder getRowEncoder(StructType schema) {
+    List<Attribute> attributes = 
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
+        .map(Attribute::toAttribute).collect(Collectors.toList());
+    return RowEncoder.apply(schema)

Review comment:
       have you tested this with both spark 3 and 2 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java
##########
@@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter 
newParquetInternalRowFileWriter(
       Path path, HoodieWriteConfig writeConfig, StructType structType, 
HoodieTable table)
       throws IOException {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());
+        writeConfig.getBloomFilterNumEntries(),
+        writeConfig.getBloomFilterFPP(),
+        writeConfig.getDynamicBloomFilterMaxNumEntries(),
+        writeConfig.getBloomFilterType());
     HoodieRowParquetWriteSupport writeSupport =
-            new HoodieRowParquetWriteSupport(table.getHadoopConf(), 
structType, filter);
+        new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, 
filter);
     return new HoodieInternalRowParquetWriter(
         path, new HoodieRowParquetConfig(
-            writeSupport,
-            writeConfig.getParquetCompressionCodec(),
-            writeConfig.getParquetBlockSize(),
-            writeConfig.getParquetPageSize(),
-            writeConfig.getParquetMaxFileSize(),
-            writeSupport.getHadoopConf(),
-            writeConfig.getParquetCompressionRatio()));
+        writeSupport,
+        writeConfig.getParquetCompressionCodec(),
+        writeConfig.getParquetBlockSize(),
+        writeConfig.getParquetPageSize(),
+        writeConfig.getParquetMaxFileSize(),
+        writeSupport.getHadoopConf(),
+        writeConfig.getParquetCompressionRatio()));
+  }
+
+  public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter(

Review comment:
       rename

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -79,4 +82,28 @@ public String getPartitionPath(Row row) {
       throw new HoodieKeyGeneratorException("Unable to parse input partition 
field :" + fieldVal, e);
     }
   }
+
+  @Override
+  public String getPartitionPath(InternalRow internalRow, StructType 
structType) {

Review comment:
       can we see if we can simply/reuse more code across these key generator 
classes?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -81,7 +105,52 @@ public String getPartitionPath(Row row) {
     return getKey(genericRecord).getPartitionPath();
   }
 
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  /**
+   * Fetch partition path from {@link InternalRow}.
+   *
+   * @param internalRow {@link InternalRow} instance from which partition path 
needs to be fetched from.
+   * @param structType  schema of the internalRow.
+   * @return the partition path.
+   */
+  public String getPartitionPath(InternalRow internalRow, StructType 
structType) {
+    try {
+      Row row = deserializeRow(getEncoder(structType), internalRow);
+      return getPartitionPath(row);
+    } catch (Exception e) {
+      throw new HoodieIOException("Conversion of InternalRow to Row failed 
with exception " + e);
+    }
+  }
+
+  private ExpressionEncoder getEncoder(StructType structType) {
+    if (encoder == null) {
+      encoder = getRowEncoder(structType);
+    }
+    return encoder;
+  }
+
+  private static ExpressionEncoder getRowEncoder(StructType schema) {
+    List<Attribute> attributes = 
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
+        .map(Attribute::toAttribute).collect(Collectors.toList());
+    return RowEncoder.apply(schema)
+        
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
+            SimpleAnalyzer$.MODULE$);
+  }
+
+  private static Row deserializeRow(ExpressionEncoder encoder, InternalRow row)
+      throws InvocationTargetException, IllegalAccessException, 
NoSuchMethodException, ClassNotFoundException {
+    // TODO remove reflection if Spark 2.x support is dropped
+    if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {

Review comment:
       I think there is a similar method in `spark-common` like this?  Can we 
try and move that up here, so we can avoid this code duplication? It ll become 
harder and harder to maintain

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -81,7 +105,52 @@ public String getPartitionPath(Row row) {
     return getKey(genericRecord).getPartitionPath();
   }
 
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  /**
+   * Fetch partition path from {@link InternalRow}.
+   *
+   * @param internalRow {@link InternalRow} instance from which partition path 
needs to be fetched from.
+   * @param structType  schema of the internalRow.
+   * @return the partition path.
+   */
+  public String getPartitionPath(InternalRow internalRow, StructType 
structType) {
+    try {
+      Row row = deserializeRow(getEncoder(structType), internalRow);
+      return getPartitionPath(row);
+    } catch (Exception e) {
+      throw new HoodieIOException("Conversion of InternalRow to Row failed 
with exception " + e);
+    }
+  }
+
+  private ExpressionEncoder getEncoder(StructType structType) {
+    if (encoder == null) {

Review comment:
       ```Java 
   if (encoder == null) {
     initEncoder();
   }
   return encoder;
   
   ```
   
   its already an instant variable, why pass it back and forth between private 
methods?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -72,4 +75,16 @@ public String getPartitionPath(Row row) {
     return RowKeyGeneratorHelper.getPartitionPathFromRow(row, 
getPartitionPathFields(),
         hiveStylePartitioning, partitionPathPositions);
   }
+
+  @Override
+  public String getPartitionPath(InternalRow row, StructType structType) {
+    buildFieldDataTypesMapIfNeeded(structType);

Review comment:
       is it possible to pass the `structType` just once during creation of the 
key generator?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -128,14 +128,35 @@ object HoodieSparkSqlWriter {
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, 
null))
           .setPartitionColumns(partitionColumns)
+          
.setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(),
 HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
+      } else {
+        // validate table properties
+        val tableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build()

Review comment:
       is there other places where we do such validation. I think this should 
be done at the WriteClient level or so? not this high up the stack?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -128,14 +128,35 @@ object HoodieSparkSqlWriter {
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, 
null))
           .setPartitionColumns(partitionColumns)
+          
.setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(),
 HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
+      } else {
+        // validate table properties
+        val tableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build()
+        if (!tableMetaClient.getTableConfig.populateMetaColumns() &&
+          
parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) {
+          throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " 
already disabled for the table. Can't be enabled back. ")
+        }
+      }
+
+      if ( 
!parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean
+        && operation != WriteOperationType.BULK_INSERT) {
+        throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " can 
only be enabled for " + WriteOperationType.BULK_INSERT

Review comment:
       you mean - can only be disabled for?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
##########
@@ -103,20 +161,22 @@ private HoodieRowCreateHandle getRowCreateHandle(String 
partitionPath) throws IO
       if (arePartitionRecordsSorted) {
         close();
       }
-      handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, 
writeConfig, partitionPath, getNextFileId(),
+      handles.put(partitionPath, populateMetaColumns ? new 
HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),

Review comment:
       lets pull out the ternary operator into its own line.. and have a 
variable to hold the create handle

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
##########
@@ -52,30 +62,78 @@
   private final StructType structType;
   private final Boolean arePartitionRecordsSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
-
+  private final boolean populateMetaColumns;
   private HoodieRowCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private String fileIdPrefix;
   private int numFilesWritten = 0;
   private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+  private BuiltinKeyGenerator keyGenerator = null;
+  private boolean simpleKeyGen = false;
+  private int simplePartitionFieldIndex = -1;
+  private DataType simplePartitionFieldDataType;
 
   public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-      String instantTime, int taskPartitionId, long taskId, long taskEpochId, 
StructType structType, boolean arePartitionRecordsSorted) {
+                                            String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
+                                            boolean populateMetaColumns, 
boolean arePartitionRecordsSorted) {
     this.hoodieTable = hoodieTable;
     this.writeConfig = writeConfig;
     this.instantTime = instantTime;
     this.taskPartitionId = taskPartitionId;
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
     this.structType = structType;
+    this.populateMetaColumns = populateMetaColumns;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+    if (!populateMetaColumns) {
+      this.keyGenerator = getKeyGenerator(writeConfig.getProps());
+      if (keyGenerator instanceof SimpleKeyGenerator) {
+        simpleKeyGen = true;
+        simplePartitionFieldIndex = (Integer) 
structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get();
+        simplePartitionFieldDataType = 
structType.fields()[simplePartitionFieldIndex].dataType();
+      }
+    }
+  }
+
+  /**
+   * Instantiate {@link BuiltinKeyGenerator}.
+   *
+   * @param properties properties map.
+   * @return the key generator thus instantiated.
+   */
+  private BuiltinKeyGenerator getKeyGenerator(Properties properties) {
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.putAll(properties);
+    if 
(properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName()))
 {
+      return null; // Do not instantiate NonPartitionKeyGen
+    } else {
+      try {
+        return (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties);
+      } catch (ClassCastException cce) {
+        throw new HoodieIOException("Only those key gens implementing 
BuiltInKeyGenerator interface is supported in disabling meta columns path");
+      } catch (IOException e) {
+        throw new HoodieIOException("Key generator instantiation failed ", e);
+      }
+    }
   }
 
   public void write(InternalRow record) throws IOException {
     try {
-      String partitionPath = record.getUTF8String(
-          
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String partitionPath = null;
+      if (populateMetaColumns) { // usual path where meta columns are pre 
populated in prep step.
+        partitionPath = record.getUTF8String(
+            
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      } else { // if meta columns are disabled.
+        if (keyGenerator == null) { // NoPartitionerKeyGen

Review comment:
       argh

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -138,6 +138,11 @@
       .noDefaultValue()
       .withDocumentation("Base path of the dataset that needs to be 
bootstrapped as a Hudi table");
 
+  public static final ConfigProperty<String> HOODIE_POPULATE_META_COLUMNS = 
ConfigProperty
+      .key("hoodie.populate.meta.columns")
+      .defaultValue("true")
+      .withDocumentation("When enabled, populates all meta columns. When 
disabled, no meta columns are populated");

Review comment:
       add `.. and incremental queries will not be functional. This is only 
meant to be used for append only/immutable data for batch processing` 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
##########
@@ -52,30 +62,78 @@
   private final StructType structType;
   private final Boolean arePartitionRecordsSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
-
+  private final boolean populateMetaColumns;
   private HoodieRowCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private String fileIdPrefix;
   private int numFilesWritten = 0;
   private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+  private BuiltinKeyGenerator keyGenerator = null;
+  private boolean simpleKeyGen = false;
+  private int simplePartitionFieldIndex = -1;
+  private DataType simplePartitionFieldDataType;
 
   public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-      String instantTime, int taskPartitionId, long taskId, long taskEpochId, 
StructType structType, boolean arePartitionRecordsSorted) {
+                                            String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
+                                            boolean populateMetaColumns, 
boolean arePartitionRecordsSorted) {
     this.hoodieTable = hoodieTable;
     this.writeConfig = writeConfig;
     this.instantTime = instantTime;
     this.taskPartitionId = taskPartitionId;
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
     this.structType = structType;
+    this.populateMetaColumns = populateMetaColumns;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+    if (!populateMetaColumns) {
+      this.keyGenerator = getKeyGenerator(writeConfig.getProps());
+      if (keyGenerator instanceof SimpleKeyGenerator) {
+        simpleKeyGen = true;
+        simplePartitionFieldIndex = (Integer) 
structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get();
+        simplePartitionFieldDataType = 
structType.fields()[simplePartitionFieldIndex].dataType();
+      }
+    }
+  }
+
+  /**
+   * Instantiate {@link BuiltinKeyGenerator}.
+   *
+   * @param properties properties map.
+   * @return the key generator thus instantiated.
+   */
+  private BuiltinKeyGenerator getKeyGenerator(Properties properties) {
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.putAll(properties);
+    if 
(properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName()))
 {
+      return null; // Do not instantiate NonPartitionKeyGen
+    } else {
+      try {
+        return (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties);
+      } catch (ClassCastException cce) {
+        throw new HoodieIOException("Only those key gens implementing 
BuiltInKeyGenerator interface is supported in disabling meta columns path");

Review comment:
       please write the error messages from a user facing angle. `meta columns 
path` can be a confusing thing to read, when all users do is deal with 
file/folder paths.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -128,14 +128,35 @@ object HoodieSparkSqlWriter {
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, 
null))
           .setPartitionColumns(partitionColumns)
+          
.setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(),
 HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
+      } else {
+        // validate table properties
+        val tableMetaClient = 
HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build()
+        if (!tableMetaClient.getTableConfig.populateMetaColumns() &&
+          
parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) {
+          throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " 
already disabled for the table. Can't be enabled back. ")
+        }
+      }
+
+      if ( 
!parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean
+        && operation != WriteOperationType.BULK_INSERT) {
+        throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " can 
only be enabled for " + WriteOperationType.BULK_INSERT
+          + " operation");
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
 
       // short-circuit if bulk_insert via row is enabled.
       // scalastyle:off
+      if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER_OPT_KEY) &&

Review comment:
       please move the entire row writing block into its own method. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
##########
@@ -112,4 +116,40 @@
 
     return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, 
config.getBulkInsertShuffleParallelism());
   }
+
+  /**
+   * Add empty meta columns and reorder such that meta columns are at the 
beginning.
+   *
+   * @param rows
+   * @return
+   */
+  public static Dataset<Row> 
prepareHoodieDatasetForBulkInsertAppendOnly(Dataset<Row> rows) {

Review comment:
       `prepareHoodieDatasetForBulkInsertWithoutMetaFields` , just one 
terminology to talk about a fork in the code path 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -298,7 +298,7 @@ object DataSourceWriteOptions {
   val ENABLE_ROW_WRITER_OPT_KEY: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.row.writer.enable")
     .defaultValue("false")
-    .withDocumentation("")
+    .withDocumentation("Will perform write operations directly using the spark 
native `Row` representation")

Review comment:
       I fixed this already. please rebase and consolidate.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
##########
@@ -52,30 +62,78 @@
   private final StructType structType;
   private final Boolean arePartitionRecordsSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
-
+  private final boolean populateMetaColumns;
   private HoodieRowCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private String fileIdPrefix;
   private int numFilesWritten = 0;
   private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+  private BuiltinKeyGenerator keyGenerator = null;
+  private boolean simpleKeyGen = false;
+  private int simplePartitionFieldIndex = -1;
+  private DataType simplePartitionFieldDataType;
 
   public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-      String instantTime, int taskPartitionId, long taskId, long taskEpochId, 
StructType structType, boolean arePartitionRecordsSorted) {
+                                            String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
+                                            boolean populateMetaColumns, 
boolean arePartitionRecordsSorted) {
     this.hoodieTable = hoodieTable;
     this.writeConfig = writeConfig;
     this.instantTime = instantTime;
     this.taskPartitionId = taskPartitionId;
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
     this.structType = structType;
+    this.populateMetaColumns = populateMetaColumns;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+    if (!populateMetaColumns) {
+      this.keyGenerator = getKeyGenerator(writeConfig.getProps());
+      if (keyGenerator instanceof SimpleKeyGenerator) {
+        simpleKeyGen = true;
+        simplePartitionFieldIndex = (Integer) 
structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get();
+        simplePartitionFieldDataType = 
structType.fields()[simplePartitionFieldIndex].dataType();
+      }
+    }
+  }
+
+  /**
+   * Instantiate {@link BuiltinKeyGenerator}.
+   *
+   * @param properties properties map.
+   * @return the key generator thus instantiated.
+   */
+  private BuiltinKeyGenerator getKeyGenerator(Properties properties) {
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.putAll(properties);
+    if 
(properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName()))
 {
+      return null; // Do not instantiate NonPartitionKeyGen

Review comment:
       Please refrain from using `null` as return value. We have `Option` for 
that, without NPE business 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -72,4 +75,16 @@ public String getPartitionPath(Row row) {
     return RowKeyGeneratorHelper.getPartitionPathFromRow(row, 
getPartitionPathFields(),
         hiveStylePartitioning, partitionPathPositions);
   }
+
+  @Override
+  public String getPartitionPath(InternalRow row, StructType structType) {
+    buildFieldDataTypesMapIfNeeded(structType);

Review comment:
       naive question. `InternalRow` does not have the schema with itself.? 
row.getSchema?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -377,6 +398,60 @@ object HoodieSparkSqlWriter {
     (syncHiveSuccess, common.util.Option.ofNullable(instantTime))
   }
 
+  def bulkInsertAsRowNoMetaColumns(sqlContext: SQLContext,

Review comment:
       lets please unify this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Add support to disable meta column to BulkInsert Row Writer path
> ----------------------------------------------------------------
>
>                 Key: HUDI-2161
>                 URL: https://issues.apache.org/jira/browse/HUDI-2161
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available
>
> Objective here is to disable all meta columns so as to avoid storage cost. 
> Also, some benefits could be seen in write latency with row writer path as no 
> special handling is required at RowCreateHandle layer. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to