codope commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1607649124


##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -123,10 +129,23 @@ public abstract ClosableIterator<T> getFileRecordIterator(
    * @param props        Properties.
    * @return The ordering value.
    */
-  public abstract Comparable getOrderingValue(Option<T> recordOption,
+  public Comparable getOrderingValue(Option<T> recordOption,

Review Comment:
   nit: indentation



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> {
+
+  private boolean copy;
+  private boolean isDeleted;

Review Comment:
   make it final?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> {
+
+  private boolean copy;
+  private boolean isDeleted;
+
+  public boolean isDeleted() {
+    return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+    super(key, data);
+    this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+    this.objectInspectorCache = objectInspectorCache;
+    this.schema = schema;
+    this.copy = false;
+    isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+                           ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+    super(key, data, operation, Option.empty());
+    this.schema = schema;
+    this.copy = isCopy;
+    isDeleted = data == null;
+    this.objectInspector = objectInspector;
+    this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance() {
+    return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance(HoodieKey key, 
HoodieOperation op) {
+    throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance(HoodieKey key) {
+    throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+    String orderingField = ConfigUtils.getOrderingField(props);
+    if (orderingField == null) {
+      return 0;
+      //throw new IllegalArgumentException("Ordering Field is not set. 
Precombine must be set. (If you are using a custom record merger it might be 
something else)");
+    }
+    return (Comparable<?>) getValue(ConfigUtils.getOrderingField(props));
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.HIVE;
+  }
+
+  @Override
+  public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");

Review Comment:
   shall we implement `getRecordKey`. MIght be useful later. I guess we only 
need to call `getValue()` for record key field, isn't it?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map<String,String[]> hosts;
+  protected final Map<String, TypeInfo> columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
+
+  private final List<String> partitionCols;
+  private final Set<String> partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+                                    InputSplit split,
+                                    JobConf jobConf,
+                                    Reporter reporter,
+                                    Schema writerSchema,
+                                    Map<String,String[]> hosts,
+                                    HoodieTableMetaClient metaClient) {
+    this.readerCreator = readerCreator;
+    this.split = split;
+    this.jobConf = jobConf;
+    this.reporter = reporter;
+    this.writerSchema = writerSchema;
+    this.hosts = hosts;
+    this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+            .filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+    this.partitionColSet = new HashSet<>(this.partitionCols);
+    String tableName = metaClient.getTableConfig().getTableName();
+    recordKeyField = metaClient.getTableConfig().populateMetaFields()
+        ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+        : assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+    this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf);
+    this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
+  }
+
+  private static String assertSingleKey(Option<String[]> recordKeyFieldsOpt) {
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "no record 
key field");
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "more 
than 1 record key, and not meta fields");
+    return recordKeyFieldsOpt.get()[0];
+  }
+
+  @Override
+  public FileSystem getFs(String path, Configuration conf) {
+    return FSUtils.getFs(path, conf);
+  }
+
+  @Override
+  public ClosableIterator<ArrayWritable> getFileRecordIterator(Path filePath, 
long start, long length, Schema dataSchema, Schema requiredSchema, 
Configuration conf) throws IOException {
+    JobConf jobConfCopy = new JobConf(jobConf);
+    //move the partition cols to the end, because in some cases it has issues 
if we don't do that
+    Schema modifiedDataSchema = 
HoodieAvroUtils.generateProjectionSchema(dataSchema, 
Stream.concat(dataSchema.getFields().stream()
+            .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> 
!partitionColSet.contains(n)),
+        partitionCols.stream().filter(c -> dataSchema.getField(c) != 
null)).collect(Collectors.toList()));
+    setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
+    InputSplit inputSplit = new FileSplit(filePath, start, length, 
hosts.get(filePath.toString()));
+    RecordReader<NullWritable, ArrayWritable> recordReader = 
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
+    if (firstRecordReader == null) {
+      firstRecordReader = recordReader;
+    }
+    ClosableIterator<ArrayWritable> recordIterator = new 
RecordReaderValueIterator<>(recordReader);
+    if (modifiedDataSchema.equals(requiredSchema)) {
+      return  recordIterator;
+    }
+    //The record reader puts the required columns in the positions of the data 
schema and nulls the rest of the columns
+    return new CloseableMappingIterator<>(recordIterator, 
projectRecord(modifiedDataSchema, requiredSchema));
+  }
+
+  private void setSchemas(JobConf jobConf, Schema dataSchema, Schema 
requiredSchema) {
+    List<String> dataColumnNameList = dataSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList());
+    List<TypeInfo> dataColumnTypeList = 
dataColumnNameList.stream().map(fieldName -> {
+      TypeInfo type = columnTypeMap.get(fieldName);
+      if (type == null) {
+        throw new IllegalArgumentException("Field: " + fieldName + ", does not 
have a defined type");
+      }
+      return type;
+    }).collect(Collectors.toList());
+    jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
+    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
+    //don't replace `f -> f.name()` with lambda reference
+    String readColNames = requiredSchema.getFields().stream().map(f -> 
f.name()).collect(Collectors.joining(","));
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
readColNames);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
requiredSchema.getFields()
+        .stream().map(f -> 
String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(",")));
+  }
+
+  @Override
+  public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) {
+    //should be support timestamp?
+    return (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
avroRecord.getSchema(), false);
+  }
+
+  @Override
+  public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+    switch (mergerStrategy) {
+      case DEFAULT_MERGER_STRATEGY_UUID:
+        return new HoodieHiveRecordMerger();
+      default:
+        throw new HoodieException("The merger strategy UUID is not supported: 
" + mergerStrategy);

Review Comment:
   +1 let's add it in the error message and optionally replace switch by 
if-else. 



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+                                           final InputSplit split,
+                                           final JobConf jobConf,
+                                           final Reporter reporter) throws 
IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(jobConfCopy)
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
+    //get some config vals
+    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType spillMapType =  
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+
+    this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
jobConfCopy, tableBasePath,
+        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath),
+        tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), 
metaClient.getTableConfig(), fileSplit.getStart(),
+        fileSplit.getLength(), false, maxMemoryForMerge, 
spillableMapPath,spillMapType, bitmaskCompressEnabled);
+    this.fileGroupReader.initRecordIterators();
+    //it expects the partition columns to be at the end
+    Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        Stream.concat(tableSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+            partitionColumns.stream()).collect(Collectors.toList()));
+    this.reverseProjection = 
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (!fileGroupReader.hasNext()) {
+      return false;
+    }
+    value.set(fileGroupReader.next().get());
+    reverseProjection.apply(value);
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return nullWritable;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return arrayWritable;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return readerContext.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileGroupReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return readerContext.getProgress();
+  }
+
+  public RealtimeSplit getSplit() {
+    return (RealtimeSplit) inputSplit;
+  }
+
+  public JobConf getJobConf() {
+    return jobConfCopy;
+  }
+
+  public static List<String> getPartitionFieldNames(JobConf jobConf) {

Review Comment:
   Let's move the static methods to `HoodieInputFormatUtils` for better 
reusability.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+                                           final InputSplit split,
+                                           final JobConf jobConf,
+                                           final Reporter reporter) throws 
IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(jobConfCopy)
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);

Review Comment:
   looks like this will be created per split even though the context should not 
change at split level. If we are not using the split in the context in any way, 
can we make it singleton or use a cache at metaClient level.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> {

Review Comment:
   javadoc please



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map<String,String[]> hosts;
+  protected final Map<String, TypeInfo> columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
+
+  private final List<String> partitionCols;
+  private final Set<String> partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+                                    InputSplit split,
+                                    JobConf jobConf,
+                                    Reporter reporter,
+                                    Schema writerSchema,
+                                    Map<String,String[]> hosts,
+                                    HoodieTableMetaClient metaClient) {
+    this.readerCreator = readerCreator;
+    this.split = split;
+    this.jobConf = jobConf;
+    this.reporter = reporter;
+    this.writerSchema = writerSchema;
+    this.hosts = hosts;
+    this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+            .filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+    this.partitionColSet = new HashSet<>(this.partitionCols);
+    String tableName = metaClient.getTableConfig().getTableName();
+    recordKeyField = metaClient.getTableConfig().populateMetaFields()
+        ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+        : assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+    this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf);
+    this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
+  }
+
+  /**
+   * If populate meta fields is false, then getRecordKeyFields()
+   * should return exactly 1 recordkey field.
+   */
+  private static String assertSingleKey(Option<String[]> recordKeyFieldsOpt) {

Review Comment:
   rename to `getRecordKeyField` and subsume the populateMetaFields check 
within the method. 



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {

Review Comment:
   rename to `HoodieFileGroupReaderBasedRecordReader` and add javadoc please.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java:
##########
@@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String 
payloadClass) throws IOException {
   @ParameterizedTest
   @MethodSource("writeLogTest")
   public void testWriteLogDuringCompaction(boolean enableMetadataTable, 
boolean enableTimelineServer) throws IOException {
-    Properties props = getPropertiesForKeyGen(true);
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
-        .forTable("test-trip-table")
-        .withPath(basePath())
-        .withSchema(TRIP_EXAMPLE_SCHEMA)
-        .withParallelism(2, 2)
-        .withAutoCommit(true)
-        .withEmbeddedTimelineServerEnabled(enableTimelineServer)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
-            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-            
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
-        .build();
-    props.putAll(config.getProps());
-
-    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
-    client = getHoodieWriteClient(config);
-
-    final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
-    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+    try {
+      //disable for this test because it seems like we process mor in a 
different order?

Review Comment:
   sorry, what do you mean by this?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+                                           final InputSplit split,
+                                           final JobConf jobConf,
+                                           final Reporter reporter) throws 
IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(jobConfCopy)
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
+    //get some config vals
+    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType spillMapType =  
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+
+    this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
jobConfCopy, tableBasePath,
+        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath),
+        tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), 
metaClient.getTableConfig(), fileSplit.getStart(),
+        fileSplit.getLength(), false, maxMemoryForMerge, 
spillableMapPath,spillMapType, bitmaskCompressEnabled);
+    this.fileGroupReader.initRecordIterators();
+    //it expects the partition columns to be at the end
+    Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        Stream.concat(tableSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+            partitionColumns.stream()).collect(Collectors.toList()));
+    this.reverseProjection = 
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (!fileGroupReader.hasNext()) {
+      return false;
+    }
+    value.set(fileGroupReader.next().get());
+    reverseProjection.apply(value);
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return nullWritable;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return arrayWritable;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return readerContext.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileGroupReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return readerContext.getProgress();
+  }
+
+  public RealtimeSplit getSplit() {
+    return (RealtimeSplit) inputSplit;
+  }
+
+  public JobConf getJobConf() {
+    return jobConfCopy;
+  }
+
+  public static List<String> getPartitionFieldNames(JobConf jobConf) {
+    String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+    return partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+        : new ArrayList<>();
+  }
+
+  private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, 
JobConf jobConf, String latestCommitTime) {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    try {
+      Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime);
+      // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+      return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, 
getPartitionFieldNames(jobConf));
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to get table schema", e);
+    }
+  }
+
+  public static String getTableBasePath(InputSplit split, JobConf jobConf) 
throws IOException {
+    if (split instanceof RealtimeSplit) {
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      return realtimeSplit.getBasePath();
+    } else {
+      Path inputPath = ((FileSplit)split).getPath();
+      FileSystem fs =  inputPath.getFileSystem(jobConf);
+      Option<Path> tablePath = TablePathUtils.getTablePath(fs, inputPath);
+      return tablePath.get().toString();
+    }
+  }
+
+  private static String getLatestCommitTime(InputSplit split, 
HoodieTableMetaClient metaClient) {
+    if (split instanceof RealtimeSplit) {
+      return ((RealtimeSplit) split).getMaxCommitTime();
+    }
+    Option<HoodieInstant> lastInstant = 
metaClient.getCommitsTimeline().lastInstant();
+    if (lastInstant.isPresent()) {
+      return lastInstant.get().getTimestamp();
+    } else {
+      return "";
+    }
+  }
+
+  /**
+   * Convert FileSplit to FileSlice, but save the locations in 'hosts' because 
that data is otherwise lost.
+   */
+  private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {

Review Comment:
   let's move to a util and UT this method.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+                                           final InputSplit split,
+                                           final JobConf jobConf,
+                                           final Reporter reporter) throws 
IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(jobConfCopy)
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
+    //get some config vals
+    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType spillMapType =  
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+
+    this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
jobConfCopy, tableBasePath,
+        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath),
+        tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), 
metaClient.getTableConfig(), fileSplit.getStart(),
+        fileSplit.getLength(), false, maxMemoryForMerge, 
spillableMapPath,spillMapType, bitmaskCompressEnabled);
+    this.fileGroupReader.initRecordIterators();
+    //it expects the partition columns to be at the end
+    Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        Stream.concat(tableSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+            partitionColumns.stream()).collect(Collectors.toList()));
+    this.reverseProjection = 
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (!fileGroupReader.hasNext()) {
+      return false;
+    }
+    value.set(fileGroupReader.next().get());
+    reverseProjection.apply(value);
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return nullWritable;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return arrayWritable;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return readerContext.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileGroupReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return readerContext.getProgress();
+  }
+
+  public RealtimeSplit getSplit() {
+    return (RealtimeSplit) inputSplit;
+  }
+
+  public JobConf getJobConf() {
+    return jobConfCopy;
+  }
+
+  public static List<String> getPartitionFieldNames(JobConf jobConf) {
+    String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+    return partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+        : new ArrayList<>();
+  }
+
+  private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, 
JobConf jobConf, String latestCommitTime) {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    try {
+      Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime);
+      // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+      return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, 
getPartitionFieldNames(jobConf));
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to get table schema", e);
+    }
+  }
+
+  public static String getTableBasePath(InputSplit split, JobConf jobConf) 
throws IOException {
+    if (split instanceof RealtimeSplit) {
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      return realtimeSplit.getBasePath();
+    } else {
+      Path inputPath = ((FileSplit)split).getPath();
+      FileSystem fs =  inputPath.getFileSystem(jobConf);
+      Option<Path> tablePath = TablePathUtils.getTablePath(fs, inputPath);
+      return tablePath.get().toString();
+    }
+  }
+
+  private static String getLatestCommitTime(InputSplit split, 
HoodieTableMetaClient metaClient) {
+    if (split instanceof RealtimeSplit) {
+      return ((RealtimeSplit) split).getMaxCommitTime();
+    }
+    Option<HoodieInstant> lastInstant = 
metaClient.getCommitsTimeline().lastInstant();
+    if (lastInstant.isPresent()) {
+      return lastInstant.get().getTimestamp();
+    } else {
+      return "";
+    }
+  }
+
+  /**
+   * Convert FileSplit to FileSlice, but save the locations in 'hosts' because 
that data is otherwise lost.
+   */
+  private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
+    BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+    if (split instanceof RealtimeSplit) {
+      //mor
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      boolean isLogFile = FSUtils.isLogFile(realtimeSplit.getPath());
+      String fileID;
+      String commitTime;
+      if (isLogFile) {
+        fileID = FSUtils.getFileIdFromLogPath(realtimeSplit.getPath());
+        commitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(realtimeSplit.getPath());
+      } else {
+        fileID = FSUtils.getFileId(realtimeSplit.getPath().getName());
+        commitTime = FSUtils.getCommitTime(realtimeSplit.getPath().toString());
+      }
+      HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getPartitionPath(realtimeSplit.getBasePath(),
+          realtimeSplit.getPath().getParent().toString()).toString(), fileID);
+      if (isLogFile) {
+        return new FileSlice(fileGroupId, commitTime, null, 
realtimeSplit.getDeltaLogFiles());
+      }
+      hosts.put(realtimeSplit.getPath().toString(), 
realtimeSplit.getLocations());
+      HoodieBaseFile hoodieBaseFile = new 
HoodieBaseFile(fs.getFileStatus(realtimeSplit.getPath()), bootstrapBaseFile);
+      return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, 
realtimeSplit.getDeltaLogFiles());
+    }
+    //cow
+    HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getFileId(split.getPath().getName()),
+        FSUtils.getPartitionPath(tableBasePath, 
split.getPath().getParent().toString()).toString());
+    hosts.put(split.getPath().toString(), split.getLocations());
+    return new FileSlice(fileGroupId, 
FSUtils.getCommitTime(split.getPath().toString()), new 
HoodieBaseFile(fs.getFileStatus(split.getPath()), bootstrapBaseFile), 
Collections.emptyList());
+  }
+
+  private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs) throws IOException {
+    if (split instanceof BootstrapBaseFileSplit) {
+      BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) 
split;
+      FileSplit bootstrapFileSplit = 
bootstrapBaseFileSplit.getBootstrapFileSplit();
+      hosts.put(bootstrapFileSplit.getPath().toString(), 
bootstrapFileSplit.getLocations());
+      return new BaseFile(fs.getFileStatus(bootstrapFileSplit.getPath()));
+    }
+    return null;
+  }
+
+  private static Schema createRequestedSchema(Schema tableSchema, JobConf 
jobConf) {
+    String readCols = 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (StringUtils.isNullOrEmpty(readCols)) {
+      Schema emptySchema = Schema.createRecord(tableSchema.getName(), 
tableSchema.getDoc(),
+          tableSchema.getNamespace(), tableSchema.isError());
+      emptySchema.setFields(Collections.emptyList());
+      return emptySchema;
+    }
+    //hive will handle the partition cols

Review Comment:
   Not sure I follow this part. Do you mean to say Hive will add partition 
columns automatically even if we don't provide it in the requested schema?
   Also, let's move this method to a util and add UT as well.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {

Review Comment:
   `getRecordReader` should be called once throughout the lifecycle of the 
query. You can verify using some e2e test. Or add a debug log and ensure the 
reader context is created once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to