This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new d6ef83a3b379 [MINOR] Add optimisation for repair with logical 
timestamp (#18478)
d6ef83a3b379 is described below

commit d6ef83a3b379f469da57530a95b80220d350701a
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Apr 10 09:21:55 2026 +0530

    [MINOR] Add optimisation for repair with logical timestamp (#18478)
    
    The PR adds optimisation for merged read handle so that schema is repaired 
only if schema has timestamp millis field. The existence of timestamp millis 
field is computed in driver.
    It also addresses review comments in PR #18132 for 0.15.1 version so that 
repair is applied only in cases where timestamp millis field is present in 
schema.
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  6 +++-
 .../org/apache/hudi/io/HoodieMergedReadHandle.java | 21 ++++++-------
 .../metadata/HoodieBackedTableMetadataWriter.java  |  9 ++++--
 .../table/action/commit/HoodieMergeHelper.java     |  8 +++--
 .../hudi/table/action/compact/HoodieCompactor.java |  7 +++++
 .../hudi/io/storage/HoodieSparkParquetReader.java  |  6 ++--
 .../commit/BaseSparkCommitActionExecutor.java      | 19 ++++++++++++
 .../apache/hudi/io/TestHoodieMergedReadHandle.java |  8 ++++-
 .../apache/parquet/schema/AvroSchemaRepair.java    |  1 +
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 34 ++++++++++++++++++++++
 .../table/log/AbstractHoodieLogRecordReader.java   |  6 ++--
 .../hudi/common/table/log/HoodieLogFileReader.java |  3 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    | 11 +++++--
 .../avro/HoodieAvroParquetReaderBuilder.java       |  7 +++--
 .../org/apache/parquet/schema/SchemaRepair.java    |  1 +
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  6 ++--
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  3 +-
 .../org/apache/hudi/IncrementalRelation.scala      | 23 ++++++++-------
 .../hudi/MergeOnReadIncrementalRelation.scala      | 10 +++++--
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  7 ++++-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  6 ++--
 .../Spark34LegacyHoodieParquetFileFormat.scala     |  2 +-
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |  6 ++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 30 +++++++++----------
 24 files changed, 168 insertions(+), 72 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 56ebe6a8ed1c..66ff6b6e43e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
@@ -244,8 +245,11 @@ public class HoodieIndexUtils {
         .filterCompletedInstants()
         .lastInstant()
         .map(HoodieInstant::getTimestamp);
+    Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
+    boolean hasTimestampFields = 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getStorageConf(),
+        () -> baseFileReaderSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema));
     return partitionLocations.flatMap(p
-        -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(p.getKey(), p.getValue()))
+        -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(p.getKey(), p.getValue()), hasTimestampFields)
         .getMergedRecords().iterator());
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index 8324034c4927..28675c6779a8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -56,26 +56,23 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
   protected final Schema baseFileReaderSchema;
   private final Option<FileSlice> fileSliceOpt;
 
-  public HoodieMergedReadHandle(HoodieWriteConfig config,
-                                Option<String> instantTime,
-                                HoodieTable<T, I, K, O> hoodieTable,
-                                Pair<String, String> partitionPathFileIDPair) {
-    this(config, instantTime, hoodieTable, partitionPathFileIDPair, 
Option.empty());
+  public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> 
instantTime,
+                                HoodieTable<T, I, K, O> hoodieTable, 
Pair<String, String> partitionPathFileIDPair,
+                                boolean hasTimestampFields) {
+    this(config, instantTime, hoodieTable, partitionPathFileIDPair, 
hasTimestampFields, Option.empty());
   }
 
-  public HoodieMergedReadHandle(HoodieWriteConfig config,
-                                Option<String> instantTime,
-                                HoodieTable<T, I, K, O> hoodieTable,
-                                Pair<String, String> partitionPathFileIDPair,
-                                Option<FileSlice> fileSliceOption) {
+  public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> 
instantTime,
+                                HoodieTable<T, I, K, O> hoodieTable, 
Pair<String, String> partitionPathFileIDPair,
+                                boolean hasTimestampFields, Option<FileSlice> 
fileSliceOption) {
     super(config, instantTime, hoodieTable, partitionPathFileIDPair);
     Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     // config.getSchema is not canonicalized, while config.getWriteSchema is 
canonicalized. So, we have to use the canonicalized schema to read the existing 
data.
-    baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
+    this.baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
     fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : 
getLatestFileSlice();
     // Repair reader schema.
     // Assume writer schema should be correct. If not, no repair happens.
-    readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, 
baseFileReaderSchema);
+    readerSchema = hasTimestampFields ? 
AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, 
this.baseFileReaderSchema) : orignalReaderSchema;
   }
 
   public List<HoodieRecord<T>> getMergedRecords() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 6289d652cca7..cf9e988e0a8d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
@@ -75,6 +77,7 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -584,8 +587,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       final String partition = partitionAndFileSlice.getKey();
       final FileSlice fileSlice = partitionAndFileSlice.getValue();
       final String fileId = fileSlice.getFileId();
-      return new HoodieMergedReadHandle(dataWriteConfig, instantTime, 
hoodieTable, Pair.of(partition, fileSlice.getFileId()),
-          Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
+      Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(dataWriteConfig.getWriteSchema()), 
dataWriteConfig.allowOperationMetadataField());
+      boolean hasTimestampFields = baseFileReaderSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema);
+      return new HoodieMergedReadHandle(dataWriteConfig, instantTime, 
hoodieTable, Pair.of(partition, fileSlice.getFileId()), hasTimestampFields, 
Option.of(fileSlice))
+          .getMergedRecords().stream().map(record -> {
             HoodieRecord record1 = (HoodieRecord) record;
             return 
HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), 
partition, fileId,
             record1.getCurrentLocation().getInstantTime(), 0);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 284de408d2da..59b0d3a1eb01 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -87,8 +87,12 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
     HoodieFileReader bootstrapFileReader = null;
 
     Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    Schema readerSchema = 
AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);
-
+    Schema readerSchema;
+    if (!table.isMetadataTable() && 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), () -> 
true)) {
+      readerSchema = 
AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);
+    } else {
+      readerSchema = baseFileReader.getSchema();
+    }
 
     // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
     // persisted records to adhere to an evolved schema
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 9defec99c38e..bdcb40c0ffb5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
@@ -130,6 +131,12 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
     // if this is a MDT, set up the instant range of log reader just like 
regular MDT snapshot reader.
     Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
+    // Since we are using merge handle here, we can directly query the write 
schema from conf
+    // Write handle provides an option to use overridden write schema as well 
which is not used by merge handle
+    Schema writerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
+    if (!table.isMetadataTable()) {
+      
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> 
AvroSchemaUtils.hasTimestampMillisField(writerSchema));
+    }
     return context.parallelize(operations).map(operation -> compact(
         compactionHandler, metaClient, config, operation, 
compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, 
executionHelper))
         .flatMap(List::iterator);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index a52dd06ff486..0321626e8a0f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -128,10 +128,8 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
-    // Set configuration for timestamp_millis type repair.
-    if 
(!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) 
{
-      storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, 
Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
-    }
+    // Set configuration for timestamp_millis type repair (only when not 
already set).
+    AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(storage.getConf(), () -> 
AvroSchemaUtils.hasTimestampMillisField(readerSchema));
     MessageType fileSchema = getFileSchema();
     Schema nonNullSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
     Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index f9b474eedd47..68a58b706ac4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -43,6 +45,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.index.HoodieIndex;
@@ -256,6 +259,22 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
       // Partition only
       partitionedRDD = mappedRDD.partitionBy(partitioner);
     }
+
+    if (!table.isMetadataTable() && 
table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 > 0) {
+      TableSchemaResolver schemaResolver = new 
TableSchemaResolver(table.getMetaClient());
+      try {
+        
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> 
{
+          try {
+            return 
AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema());
+          } catch (Exception e) {
+            return true;
+          }
+        });
+      } catch (Exception e) {
+        throw new HoodieException("Failed to set logical ts related configs", 
e);
+      }
+    }
+
     return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
       if (WriteOperationType.isChangingRecords(operationType)) {
         return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
index fd0e878d482a..cee0ceac9749 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.AWSDmsAvroPayload;
@@ -41,6 +43,7 @@ import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -198,7 +201,10 @@ public class TestHoodieMergedReadHandle extends 
SparkClientFunctionalTestHarness
         .collect(Collectors.toList());
     assertEquals(1, partitionPathAndFileIDPairs.size());
     String latestCommitTime = 
table.getActiveTimeline().lastInstant().get().getTimestamp();
-    HoodieMergedReadHandle mergedReadHandle = new 
HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, 
partitionPathAndFileIDPairs.get(0));
+    Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getWriteSchema()), 
writeConfig.allowOperationMetadataField());
+    boolean hasTimestampFields = baseFileReaderSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema);
+    HoodieMergedReadHandle mergedReadHandle = new 
HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, 
partitionPathAndFileIDPairs.get(0),
+        hasTimestampFields);
     List<HoodieRecord> mergedRecords = mergedReadHandle.getMergedRecords();
     assertEquals(totalRecords, mergedRecords.size());
     List<HoodieRecord> sortedMergedRecords = mergedRecords.stream()
diff --git 
a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java 
b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
index 78908a40ae52..8546b1b8a5e3 100644
--- a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
+++ b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
@@ -22,6 +22,7 @@ package org.apache.parquet.schema;
 import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
 
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 060a99422d63..3ef4f151645f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -28,9 +28,12 @@ import 
org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.action.TableChanges;
 import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
+import org.apache.hadoop.conf.Configuration;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -42,6 +45,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.reduce;
@@ -531,4 +535,34 @@ public class AvroSchemaUtils {
       return false;
     }
   }
+
+  /**
+   * Sets logical timestamp repair needed key in conf to true
+   */
+  public static void setLogicalTimestampRepairIfNotSet(Configuration conf, 
Supplier<Boolean> valueSupplier) {
+    if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) {
+      conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, 
valueSupplier.get().toString());
+    }
+  }
+
+  /**
+   * Sets logical timestamp repair needed key in conf to true
+   */
+  public static void setLogicalTimestampRepairIfNotSet(StorageConfiguration 
conf, Supplier<Boolean> valueSupplier) {
+    if (!conf.contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) {
+      conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, 
valueSupplier.get().toString());
+    }
+  }
+
+  /**
+   * Returns true if logical timestamp repair needed key is set to true or if 
it is not present in config
+   */
+  public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration 
conf, Supplier<Boolean> defaultValueSupplier) {
+    Option<String> valueOpt = 
conf.getString(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR);
+    if (valueOpt.isEmpty() || StringUtils.isNullOrEmpty(valueOpt.get())) {
+      return defaultValueSupplier.get();
+    } else {
+      return Boolean.parseBoolean(valueOpt.get());
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index d62a25b1e1ad..dc610a16758d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -43,7 +43,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
-import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -70,7 +69,6 @@ import java.util.stream.Collectors;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
-import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -186,8 +184,8 @@ public abstract class AbstractHoodieLogRecordReader {
     this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? 
InternalSchema.getEmptyInternalSchema() : internalSchema;
     this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
-    this.enableLogicalTimestampFieldRepair = 
!hoodieTableMetaClient.isMetadataTable() && 
storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
-        () -> readerSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(readerSchema));
+    this.enableLogicalTimestampFieldRepair = 
!hoodieTableMetaClient.isMetadataTable()
+        && AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), 
() -> readerSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(readerSchema));
 
     if (keyFieldOverride.isPresent()) {
       // NOTE: This branch specifically is leveraged handling Metadata Table
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 44fc5ec6fd1f..f8e873d14239 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -39,7 +39,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.util.IOUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -100,7 +99,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
                              boolean enableRecordLookups, String keyField) 
throws IOException {
     this(storage, logFile, readerSchema, bufferSize, reverseReader, 
enableRecordLookups, keyField,
         InternalSchema.getEmptyInternalSchema(),
-        
storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
+        AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(),
             () -> readerSchema != null && 
AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
   }
 
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index fcd851efd47f..f179167c41a5 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.io.hadoop;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -172,7 +173,12 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
     Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
-    Schema repairedFileSchema = getRepairedSchema(getSchema(), schema);
+    Schema repairedFileSchema;
+    if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () 
-> true)) {
+      repairedFileSchema = getRepairedSchema(getSchema(), schema);
+    } else {
+      repairedFileSchema = schema;
+    }
     Option<Schema> promotedSchema = Option.empty();
     if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema,
 schema)) {
       AvroReadSupport.setAvroReadSchema(hadoopConf, repairedFileSchema);
@@ -183,7 +189,8 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
       AvroReadSupport.setRequestedProjection(hadoopConf, schema);
     }
     ParquetReader<IndexedRecord> reader =
-        new HoodieAvroParquetReaderBuilder<IndexedRecord>(path)
+        new HoodieAvroParquetReaderBuilder<IndexedRecord>(path,
+            AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), 
() -> schema == null || AvroSchemaUtils.hasTimestampMillisField(schema)))
             .withTableSchema(schema)
             .withConf(hadoopConf)
             .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, 
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
index 308fe34a726c..cb699155e9bb 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.parquet.avro;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.storage.StoragePath;
 
@@ -42,11 +41,13 @@ public class HoodieAvroParquetReaderBuilder<T> extends 
ParquetReader.Builder<T>
   private GenericData model = null;
   private boolean enableCompatibility = true;
   private boolean isReflect = true;
+  private boolean isLogicalTimestampRepairNeeded;
   private Schema tableSchema = null;
 
   @Deprecated
-  public HoodieAvroParquetReaderBuilder(StoragePath path) {
+  public HoodieAvroParquetReaderBuilder(StoragePath path, boolean 
isLogicalTimestampRepairNeeded) {
     super(new Path(path.toUri()));
+    this.isLogicalTimestampRepairNeeded = isLogicalTimestampRepairNeeded;
   }
 
   public HoodieAvroParquetReaderBuilder(InputFile file) {
@@ -88,6 +89,6 @@ public class HoodieAvroParquetReaderBuilder<T> extends 
ParquetReader.Builder<T>
       conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
     }
     return new HoodieAvroReadSupport<>(model, 
Option.ofNullable(tableSchema).map(schema -> 
getAvroSchemaConverter(conf).convert(schema)),
-        tableSchema == null || 
AvroSchemaUtils.hasTimestampMillisField(tableSchema));
+        isLogicalTimestampRepairNeeded);
   }
 }
diff --git 
a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java
 
b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java
index b85b3b98fac3..8a4b53e6ab6e 100644
--- 
a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java
+++ 
b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java
@@ -20,6 +20,7 @@
 package org.apache.parquet.schema;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 0534f3dac8ae..3b27bc165b87 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -116,6 +116,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
 
+  protected lazy val hasTimestampMillisFieldInTableSchema: Boolean = 
HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema)
+
   protected lazy val basePath: Path = new Path(metaClient.getBasePath.toUri)
 
   // NOTE: Record key-field is assumed singular here due to the either of
@@ -245,7 +247,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
         // We're delegating to Spark to append partition values to every row 
only in cases
         // when these corresponding partition-values are not persisted w/in 
the data file itself
         val parquetFileFormat = 
sparkAdapter.createLegacyHoodieParquetFileFormat(
-          shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, 
hasTimestampMillisFieldInTableSchema = 
HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema)).get
+          shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, 
hasTimestampMillisFieldInTableSchema = hasTimestampMillisFieldInTableSchema).get
         (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
     }
 
@@ -555,7 +557,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
             // when these corresponding partition-values are not persisted 
w/in the data file itself
             appendPartitionValues = 
shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath),
             tableAvroSchema,
-            hasTimestampMillisFieldInTableSchema = 
HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema)
+            hasTimestampMillisFieldInTableSchema = 
hasTimestampMillisFieldInTableSchema
           )
           // Since partition values by default are omitted, and not persisted 
w/in data-files by Spark,
           // data-file readers (such as [[ParquetFileFormat]]) have to inject 
partition values while reading
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index db538f110c90..5f78002dddda 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,9 +23,11 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
 import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK
 import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
+import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.io.storage.HoodieFileReader
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -85,7 +87,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
 
   protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
-
   private val hadoopConfBroadcast = sc.broadcast(new 
SerializableWritable(config))
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 527a7c6c5f51..3731a7b5e0c7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,6 +17,9 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{GlobPattern, Path}
 import org.apache.hudi.avro.AvroSchemaUtils
 import 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME
 import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
@@ -38,9 +41,10 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.io.storage.HoodieFileReader
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.table.HoodieSparkTable
-
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.GlobPattern
+import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
 import 
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
@@ -205,16 +209,15 @@ class IncrementalRelation(val sqlContext: SQLContext,
       // pass internalSchema to hadoopConf, so it can be used in executors.
       val validCommits = metaClient
         
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",")
-      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
 SerDeHelper.toJson(internalSchema))
-      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
 metaClient.getBasePath.toString)
-      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
 validCommits)
+      val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
SerDeHelper.toJson(internalSchema))
+      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, 
metaClient.getBasePath.toString)
+      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, 
validCommits)
       // Pass table Avro schema to hadoopConf so supportBatch() can find it 
(supportBatch does not receive options).
       if (tableAvroSchema.getType != Schema.Type.NULL) {
-        LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf(
-          sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema)
-        sqlContext.sparkContext.hadoopConfiguration.set(
-          HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
-          AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString)
+        LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf(conf, 
tableAvroSchema)
+        AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(conf,
+          JFunction.toJavaSupplier(() => 
AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).asInstanceOf[java.lang.Boolean]))
       }
       val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
         case HoodieFileFormat.PARQUET => 
LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
@@ -255,7 +258,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
             val timer = HoodieTimer.start
 
             val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
-            val storageConf = 
HadoopFSUtils.getStorageConfWithCopy(sqlContext.sparkContext.hadoopConfiguration)
+            val storageConf = HadoopFSUtils.getStorageConfWithCopy(conf)
             val localBasePathStr = basePath.toString
             val firstNotFoundPath = 
sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size)
               .map(path => {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 4d83a4d5288d..c340a5507ea8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,9 +17,10 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
+import org.apache.hadoop.fs.GlobPattern
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
+import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
@@ -29,10 +30,10 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
+import org.apache.hudi.io.storage.HoodieFileReader
 import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths
 import org.apache.hudi.storage.StoragePathInfo
-
-import org.apache.hadoop.fs.GlobPattern
+import org.apache.hudi.util.JFunction
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
@@ -81,6 +82,9 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
     val optionalFilters = filters
     val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
+    if (!metaClient.isMetadataTable) {
+      AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, 
JFunction.toJavaSupplier(() => 
hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean]))
+    }
     new HoodieMergeOnReadRDD(
       sqlContext.sparkContext,
       config = jobConf,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 5b6be9c55857..deb3cdf12a4b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, 
isProjectionCompatible}
-import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.io.storage.HoodieFileReader
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.util.JFunction
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
@@ -115,6 +117,9 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: 
SQLContext,
     val optionalFilters = filters
     val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
+    if (!metaClient.isMetadataTable) {
+      AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, 
JFunction.toJavaSupplier(() => 
hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean]))
+    }
     new HoodieMergeOnReadRDD(
       sqlContext.sparkContext,
       config = jobConf,
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index ab73ded838e7..df7c5ead29ee 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.storage.HoodieStorage
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.util.JFunction
 import 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
 import org.apache.parquet.schema.{MessageType, SchemaRepair, Type}
 import org.apache.spark.sql.avro._
@@ -82,8 +83,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
 
   override def getParquetReadSupport(storage: HoodieStorage,
                                      messageSchema: 
org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
-    val enableTimestampFieldRepair = storage.getConf.getBoolean(
-      HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
+    val enableTimestampFieldRepair = 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, 
JFunction.toJavaSupplier(() => true))
     new HoodieParquetReadSupport(
       Option.empty[ZoneId],
       enableVectorizedReader = true,
@@ -169,7 +169,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
     val cachedRequestedSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema)
     val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of(
       
getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema))
-    val enableTimestampFieldRepair = 
storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, 
true)
+    val enableTimestampFieldRepair = 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, 
JFunction.toJavaSupplier(() => true))
     val repairedRequestedSchema = 
repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, 
requestedSchemaInMessageType)
     val repairedRequestedStructType = new 
ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema)
     val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, 
cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone)
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 11dcec7af06b..75c874580b40 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -377,7 +377,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
           new HoodieParquetReadSupport(
             convertTz,
             enableVectorizedReader = false,
-            enableTimestampFieldRepair = true,
+            enableTimestampFieldRepair = hasTimestampMillisFieldInTableSchema,
             datetimeRebaseSpec,
             int96RebaseSpec,
             tableSchemaAsMessageType)
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index 61aa3f1a915b..a8aac2d4f291 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -21,6 +21,7 @@ import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.{Spark35HoodieFileScanRDD, SparkAdapterSupport$}
 import org.apache.hudi.io.storage.HoodieFileReader
 import org.apache.hudi.storage.HoodieStorage
+import org.apache.hudi.util.JFunction
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
@@ -139,8 +140,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
 
   override def getParquetReadSupport(storage: HoodieStorage,
                                      messageSchema: 
org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
-    val enableTimestampFieldRepair = storage.getConf.getBoolean(
-      HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
+    val enableTimestampFieldRepair = 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, 
JFunction.toJavaSupplier(() => true))
     new HoodieParquetReadSupport(
       Option.empty[ZoneId],
       enableVectorizedReader = true,
@@ -166,7 +166,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
     val nonNullRequestedSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema)
     val cachedRequestedSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema)
     val requestedSchemaInMessageType = 
org.apache.hudi.common.util.Option.of(getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema))
-    val enableTimestampFieldRepair = 
storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, 
true)
+    val enableTimestampFieldRepair = 
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, 
JFunction.toJavaSupplier(() => true))
     val repairedRequestedSchema = 
repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, 
requestedSchemaInMessageType)
     val repairedRequestedStructType = new 
ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema)
     val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, 
cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone)
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5f2d81d73a80..ba54be6de357 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -788,8 +788,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", 
"CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"})
-  void testCOWLogicalRepair(String tableVersion, String recordType, String 
operation) throws Exception {
+  @CsvSource(value = {"CLUSTER,AVRO", "NONE,AVRO", "CLUSTER,SPARK", 
"NONE,SPARK"})
+  void testCOWLogicalRepair(String operation, String recordType) throws 
Exception {
     timestampNTZCompatibility(() -> {
       try {
         String dirName = "trips_logical_types_json_cow_write";
@@ -813,6 +813,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         properties.setProperty("hoodie.metatata.enable", "true");
         properties.setProperty("hoodie.parquet.small.file.limit", "-1");
         properties.setProperty("hoodie.cleaner.commits.retained", "10");
+        if (recordType.equals("SPARK")) {
+          properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
"org.apache.hudi.HoodieSparkRecordMerger");
+        } else {
+          properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
"org.apache.hudi.common.model.HoodieAvroRecordMerger");
+        }
         Option<TypedProperties> propt = Option.of(properties);
 
         new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "456"), 
jsc, propt).sync();
@@ -864,17 +869,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @ParameterizedTest
   @CsvSource(value = {
-      "SIX,AVRO,CLUSTER,AVRO",
-      "CURRENT,AVRO,NONE,AVRO",
-      "CURRENT,AVRO,CLUSTER,AVRO",
-      "CURRENT,AVRO,COMPACT,AVRO",
-      "CURRENT,AVRO,NONE,PARQUET",
-      "CURRENT,AVRO,CLUSTER,PARQUET",
-      "CURRENT,AVRO,COMPACT,PARQUET",
-      "CURRENT,SPARK,NONE,PARQUET",
-      "CURRENT,SPARK,CLUSTER,PARQUET",
-      "CURRENT,SPARK,COMPACT,PARQUET"})
-  void testMORLogicalRepair(String tableVersion, String recordType, String 
operation, String logBlockType) throws Exception {
+      "CLUSTER,AVRO",
+      "NONE,AVRO",
+      "COMPACT,AVRO",
+      "NONE,PARQUET",
+      "CLUSTER,PARQUET",
+      "COMPACT,PARQUET"
+  })
+  void testMORLogicalRepair(String operation, String logBlockType) throws 
Exception {
     timestampNTZCompatibility(() -> {
       try {
         String tableSuffix;
@@ -909,8 +911,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         
properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
schemaPath);
         String inputDataPath = 
getClass().getClassLoader().getResource("logical-repair/mor_write_updates/5").toURI().toString();
         properties.setProperty("hoodie.streamer.source.dfs.root", 
inputDataPath);
-        String mergerClass = getMergerClassForRecordType(recordType);
-        String tableVersionString = getTableVersionCode(tableVersion);
 
         properties.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
         properties.setProperty("hoodie.datasource.write.precombine.field", 
"timestamp");

Reply via email to