vinothchandar commented on a change in pull request #3315:
URL: https://github.com/apache/hudi/pull/3315#discussion_r677872797



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
##########
@@ -60,7 +60,7 @@ public static String 
getRecordKeyFromGenericRecord(GenericRecord genericRecord,
    * @return the partition path for the passed in generic record.
    */
   public static String getPartitionPathFromGenericRecord(GenericRecord 
genericRecord, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    return keyGeneratorOpt.isPresent() ? 
keyGeneratorOpt.get().getRecordKey(genericRecord) : 
genericRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();

Review comment:
       oops

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -210,8 +211,11 @@ protected String getCommitActionType() {
               .build();
 
           
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, 
scanner, readerSchema,
-              table.getMetaClient().getTableConfig().getPayloadClass()));
+              table.getMetaClient().getTableConfig().getPayloadClass(),
+              table.getMetaClient().getTableConfig().populateMetaFields() ? 
Option.empty() : 
Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp()),
+              table.getMetaClient().getTableConfig().populateMetaFields() ? 
Option.empty() : 
Option.of(table.getMetaClient().getTableConfig().getPartitionFieldProp())));

Review comment:
       I feel if we had a `Option.ofBoolean()` where it returns a Option.empty 
or the value to further map, this will read much nicer. 
   
   
`Option.ofBoolean(tableConfig.populateMetaFields).map(tableConfig::getPartitionFieldProp)`
 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
##########
@@ -110,8 +110,15 @@ public static long generateChecksum(byte[] data) {
    * Utility method to convert bytes to HoodieRecord using schema and payload 
class.
    */
   public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String 
payloadClazz) {
-    String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-    String partitionPath = 
rec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+    return convertToHoodieRecordPayload(rec, payloadClazz, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD);

Review comment:
       can't you pass these as default at a higher layer. So there will only be 
one method here, which takes 4 args. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -324,6 +324,14 @@ public void validateTableProperties(Properties properties, 
WriteOperationType op
         && Boolean.parseBoolean((String) 
properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) {
       throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() + " already 
disabled for the table. Can't be re-enabled back");
     }
+
+    // meta fields can be disabled only with SimpleKeyGenerator
+    if (!getTableConfig().populateMetaFields()
+        && 
!properties.getProperty(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key(),
 HoodieTableConfig.DEFAULT_HOODIE_TABLE_KEY_GENERATOR_CLASS)

Review comment:
       just use `org.apache.hudi.keygen.SimpleKeyGenerator.class.getName` or 
something always as opposed to using strings to hold the class name. the 
advantage is that the IDE can find these usages easily

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -82,6 +82,13 @@
   public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
   public static final String HOODIE_READ_COLUMNS_PROP = 
"hoodie.read.columns.set";
 
+  public static final String HOODIE_USE_META_FIELDS = "hoodie.use.meta.fields";

Review comment:
       but its a table level property and the query has the understandin that 
its about data is stored, not just about how its written. I would advise 
against having a different name for the same thing. it leads to more harm than 
good over the long run

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -232,21 +253,30 @@ public void testSimpleInsertAndUpdateHFile() throws 
Exception {
     }
   }
 
-  @Test
-  public void testSimpleClusteringNoUpdates() throws Exception {
-    testClustering(false);
+  @ParameterizedTest
+  @MethodSource("populateMetaFieldsParams")
+  public void testSimpleClusteringNoUpdates(boolean populateMetaFields) throws 
Exception {
+    clean();
+    init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), 
populateMetaFields);
+    testClustering(false, populateMetaFields);
   }
 
-  @Test
-  public void testSimpleClusteringWithUpdates() throws Exception {
-    testClustering(true);
+  @ParameterizedTest
+  @MethodSource("populateMetaFieldsParams")
+  public void testSimpleClusteringWithUpdates(boolean populateMetaFields) 
throws Exception {
+    clean();
+    init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), 
populateMetaFields);
+    testClustering(true, populateMetaFields);
   }
 
-  private void testClustering(boolean doUpdates) throws Exception {
+  private void testClustering(boolean doUpdates, boolean populateMetaFields) 
throws Exception {
     // set low compaction small File Size to generate more file groups.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
-    HoodieWriteConfig cfg = getConfigBuilder(true, 10L, 
clusteringConfig).build();
+    HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, 10L, 
clusteringConfig);
+    addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields);

Review comment:
       rename `addConfigsForPopulateMetaFields` 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -302,7 +310,12 @@ private void processDataBlock(HoodieDataBlock dataBlock) 
throws Exception {
   }
 
   protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
-    return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, 
this.payloadClassFQN);
+    if (!simpleRecordKeyFieldOpt.isPresent()) {

Review comment:
       please try to use Option.map.orElse. style of writing these. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -68,6 +68,7 @@
   private static final Logger LOG = 
LogManager.getLogger(HoodieTableConfig.class);
 
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
+  public static final String DEFAULT_HOODIE_TABLE_KEY_GENERATOR_CLASS = 
"org.apache.hudi.keygen.SimpleKeyGenerator";

Review comment:
       call this the default is very confusing. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
##########
@@ -171,6 +171,33 @@ public static long countRecordsSince(JavaSparkContext jsc, 
String basePath, SQLC
     }
   }
 
+  /**
+   * Obtain all new data written into the Hoodie table since the given 
timestamp.
+   */
+  public static long countAllRecords(JavaSparkContext jsc, String basePath, 
SQLContext sqlContext,

Review comment:
       lets reuse some helper or place this in the right helper test utils 
class.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -82,6 +82,13 @@
   public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
   public static final String HOODIE_READ_COLUMNS_PROP = 
"hoodie.read.columns.set";
 
+  public static final String HOODIE_USE_META_FIELDS = "hoodie.use.meta.fields";
+  public static final String DEFAULT_HOODIE_USE_META_FIELDS = "true";
+  public static final String RECORD_KEY_FIELD = "hoodie.record.key.field";

Review comment:
       I think we should just reuse from `HoodieTableConfig` and not redefine 
all of these again

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -98,7 +99,7 @@ public 
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
 
     JavaRDD<WriteStatus>[] writeStatuses = 
convertStreamToArray(writeStatusRDDStream);
     JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
-    
+

Review comment:
       can we avoid these whitespace changes?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -52,29 +55,37 @@
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> 
deltaRecordMap;
 
   private final Set<String> deltaRecordKeys;
+  private int recordKeyIndex = 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
   private Iterator<String> deltaItr;
 
   public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException 
{
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
+    this.deltaRecordMap = initAndGetMergedLogRecordScanner().getRecords();
     this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
   }
 
   /**
    * Goes through the log files and populates a map with latest version of 
each key logged, since the base split was
    * written.
    */
-  private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws 
IOException {
+  private HoodieMergedLogRecordScanner initAndGetMergedLogRecordScanner() 
throws IOException {
     // NOTE: HoodieCompactedLogRecordScanner will not return records for an 
in-flight commit
     // but can return records for completed commits > the commit we are trying 
to read (if using
     // readCommit() API)
+    FileSystem fs = FSUtils.getFs(split.getPath().toString(), jobConf);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(split.getBasePath()).build();
+    Option<String> simpleRecordKeyFieldOpt = 
metaClient.getTableConfig().populateMetaFields() ? Option.empty() : 
Option.of(metaClient.getTableConfig().getRecordKeyFieldProp());

Review comment:
       same thing. this ternary operator use with an Option is kind of bugging 
me. makes lines longer than what they need to be.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -132,17 +140,27 @@ public void init(HoodieFileFormat baseFileFormat) throws 
IOException {
 
   @BeforeEach
   public void init() throws IOException {
-    init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue());
+    init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), true);
   }
 
   @AfterEach
   public void clean() throws IOException {
     cleanupResources();
   }
 
-  @Test
-  public void testSimpleInsertAndUpdate() throws Exception {
-    HoodieWriteConfig cfg = getConfig(true);
+  private static Stream<Arguments> populateMetaFieldsParams() {

Review comment:
       whats the increase in runtime to support this?  I wonder if we need to 
test every method with this combination.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -206,16 +206,28 @@ private static Configuration 
addProjectionField(Configuration conf, String field
 
   public static void addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
-    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
-    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
-    addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+    if (configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS) == 
null || configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS)

Review comment:
       so why do we ned this if we are setting this already at the record 
reader level?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -80,6 +85,27 @@ protected HoodieDefaultTimeline 
filterInstantsTimeline(HoodieDefaultTimeline tim
   }
 
   void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf) {
+
+    // set appropriate configs in jobConf to handle disabling of meta fields 
within canAddProjectionToJobConf and addRequiredProjectionFields
+    if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
+      // inject actual field info if meta fields are disabled
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();
+      HoodieTableConfig tableConfig = metaClient.getTableConfig();
+      if (!tableConfig.populateMetaFields()) {
+        TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);

Review comment:
       same here. this is the problem of doing this at the record reader level, 
as opposed to `getSplits()`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -324,6 +324,14 @@ public void validateTableProperties(Properties properties, 
WriteOperationType op
         && Boolean.parseBoolean((String) 
properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), 
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) {
       throw new 
HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() + " already 
disabled for the table. Can't be re-enabled back");
     }
+
+    // meta fields can be disabled only with SimpleKeyGenerator
+    if (!getTableConfig().populateMetaFields()
+        && 
!properties.getProperty(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key(),
 HoodieTableConfig.DEFAULT_HOODIE_TABLE_KEY_GENERATOR_CLASS)
+        .equals(HoodieTableConfig.DEFAULT_HOODIE_TABLE_KEY_GENERATOR_CLASS)) {
+      throw new HoodieException("Only SimpleKeyGenerators are supported when 
meta fields are disabled. KeyGenerator used : "

Review comment:
       `SimpleKeyGenerators` this is not a class. so its confusing when we 
camel case it. Can we say `simple key generator` 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -206,16 +206,28 @@ private static Configuration 
addProjectionField(Configuration conf, String field
 
   public static void addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
-    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
-    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
-    addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+    if (configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS) == 
null || configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS)
+        .equals(HoodieInputFormatUtils.DEFAULT_HOODIE_USE_META_FIELDS)) {
+      addProjectionField(configuration, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
+      addProjectionField(configuration, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
+      addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+    } else {
+      addProjectionField(configuration, 
configuration.get(HoodieInputFormatUtils.RECORD_KEY_FIELD), 
Integer.parseInt(configuration.get(HoodieInputFormatUtils.RECORD_KEY_FIELD_INDEX)));
+      addProjectionField(configuration, 
configuration.get(HoodieInputFormatUtils.PARTITION_PATH_FIELD), 
Integer.parseInt(configuration.get(HoodieInputFormatUtils.PARTITION_PATH_FIELD_INDEX)));
+    }
   }
 
   public static boolean requiredProjectionFieldsExistInConf(Configuration 
configuration) {
     String readColNames = 
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
-    return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-        && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    if (configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS) == 
null || configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS)
+        .equals(HoodieInputFormatUtils.DEFAULT_HOODIE_USE_META_FIELDS)) {
+      return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+          && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+          && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    } else {
+      return readColNames.contains(HoodieInputFormatUtils.RECORD_KEY_FIELD)

Review comment:
       we don't validate the commit time fields?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -52,29 +55,37 @@
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> 
deltaRecordMap;
 
   private final Set<String> deltaRecordKeys;
+  private int recordKeyIndex = 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
   private Iterator<String> deltaItr;
 
   public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException 
{
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
+    this.deltaRecordMap = initAndGetMergedLogRecordScanner().getRecords();
     this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
   }
 
   /**
    * Goes through the log files and populates a map with latest version of 
each key logged, since the base split was
    * written.
    */
-  private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws 
IOException {
+  private HoodieMergedLogRecordScanner initAndGetMergedLogRecordScanner() 
throws IOException {
     // NOTE: HoodieCompactedLogRecordScanner will not return records for an 
in-flight commit
     // but can return records for completed commits > the commit we are trying 
to read (if using
     // readCommit() API)
+    FileSystem fs = FSUtils.getFs(split.getPath().toString(), jobConf);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(split.getBasePath()).build();

Review comment:
       same. would be good to avoid these extra reads. these are incurred 
regardless of virtual keys or not, which is problematic.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -206,16 +206,28 @@ private static Configuration 
addProjectionField(Configuration conf, String field
 
   public static void addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
-    addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
-    addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
-    addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+    if (configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS) == 
null || configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS)
+        .equals(HoodieInputFormatUtils.DEFAULT_HOODIE_USE_META_FIELDS)) {
+      addProjectionField(configuration, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
+      addProjectionField(configuration, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
+      addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
+    } else {
+      addProjectionField(configuration, 
configuration.get(HoodieInputFormatUtils.RECORD_KEY_FIELD), 
Integer.parseInt(configuration.get(HoodieInputFormatUtils.RECORD_KEY_FIELD_INDEX)));
+      addProjectionField(configuration, 
configuration.get(HoodieInputFormatUtils.PARTITION_PATH_FIELD), 
Integer.parseInt(configuration.get(HoodieInputFormatUtils.PARTITION_PATH_FIELD_INDEX)));
+    }
   }
 
   public static boolean requiredProjectionFieldsExistInConf(Configuration 
configuration) {
     String readColNames = 
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
-    return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-        && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    if (configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS) == 
null || configuration.get(HoodieInputFormatUtils.HOODIE_USE_META_FIELDS)

Review comment:
       is there a get(k, defaultValue) that we can use?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
##########
@@ -36,11 +38,14 @@
   private Iterator<HoodieRecord<? extends HoodieRecordPayload>> 
recordsIterator;
 
   public static <R extends IndexedRecord, T extends HoodieRecordPayload> 
HoodieFileSliceReader getFileSliceReader(
-      HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass) throws IOException {
+      HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass,
+      Option<String> simpleRecordKeyFieldOpt, Option<String> 
simplePartitionPathFieldOpt) throws IOException {

Review comment:
       if you can pick a key field name in the higher level i.e a custom field 
or `_hoodie_record_key`, then thats the best option. We don't even need the if 
in L48. 
   but if you cannot, then yeah may be its better add all three. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -210,8 +211,11 @@ protected String getCommitActionType() {
               .build();
 
           
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, 
scanner, readerSchema,
-              table.getMetaClient().getTableConfig().getPayloadClass()));
+              table.getMetaClient().getTableConfig().getPayloadClass(),

Review comment:
       fetch `table.getMetaClient().getTableConfig()` into a variable and then 
reuse? the code will reach much nicer IMHO

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -80,6 +85,27 @@ protected HoodieDefaultTimeline 
filterInstantsTimeline(HoodieDefaultTimeline tim
   }
 
   void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf) {
+
+    // set appropriate configs in jobConf to handle disabling of meta fields 
within canAddProjectionToJobConf and addRequiredProjectionFields
+    if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
+      // inject actual field info if meta fields are disabled
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();

Review comment:
       this is an extra read of the `.hoodie`, from each record reader? This is 
a problem.  can we avoid this. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -532,8 +539,8 @@ object HoodieSparkSqlWriter {
                                              tableConfig: HoodieTableConfig,
                                              jsc: JavaSparkContext,
                                              tableInstantInfo: TableInstantInfo
-                                             ): (Boolean, 
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
-    if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
+                                            ): (Boolean, 
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {

Review comment:
       can you please fix IDE to just format the lines you touch?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -80,6 +80,10 @@
   private final HoodieTableMetaClient hoodieTableMetaClient;
   // Merge strategy to use when combining records from log
   private final String payloadClassFQN;
+  // simple recordKey field
+  private Option<String> simpleRecordKeyFieldOpt = Option.empty();

Review comment:
       sg. I see that one member having the `Opt` suffix, the other does not. I 
prefer we keep it `simplePartitionPathField` and `simpleRecordKeyField`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -68,6 +68,7 @@
   private static final Logger LOG = 
LogManager.getLogger(HoodieTableConfig.class);
 
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
+  public static final String DEFAULT_HOODIE_TABLE_KEY_GENERATOR_CLASS = 
"org.apache.hudi.keygen.SimpleKeyGenerator";

Review comment:
       Can we get rid of this variable here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to