nsivabalan commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2743870450
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -95,11 +97,14 @@ public HoodieLogFileReader(HoodieStorage storage,
HoodieLogFile logFile, Schema
public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile,
Schema readerSchema, int bufferSize, boolean reverseReader,
boolean enableRecordLookups, String keyField)
throws IOException {
- this(storage, logFile, readerSchema, bufferSize, reverseReader,
enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
+ this(storage, logFile, readerSchema, bufferSize, reverseReader,
enableRecordLookups, keyField,
+ InternalSchema.getEmptyInternalSchema(),
+ readerSchema != null &&
AvroSchemaUtils.hasTimestampMillisField(readerSchema));
Review Comment:
We should try removing this.
lets always lookup in hadoop conf/storageConf.
And we should try and set the value in the driver before invoking these
classes
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -129,6 +129,8 @@ private ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
+ // Set configuration for timestamp_millis type repair.
+ storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR,
Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
Review Comment:
we should try to read the config from driver doing this.
if its not set, then we can parse the schema and set it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -182,6 +185,7 @@ protected AbstractHoodieLogRecordReader(HoodieStorage
storage, String basePath,
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ?
InternalSchema.getEmptyInternalSchema() : internalSchema;
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
+ this.enableLogicalTimestampFieldRepair = readerSchema != null &&
AvroSchemaUtils.hasTimestampMillisField(readerSchema);
Review Comment:
can we check if hadoopConf already contains the info and fetch it from
there.
Also, can we populate the value in hadoopConf only and use it to pass
around. I wanted to add passing individual boolean flags like we are currently
doing in this patch.
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala:
##########
@@ -506,4 +522,70 @@ object Spark35LegacyHoodieParquetFileFormat {
original.getBlocks
)
}
+
+ // Helper to replace filters on timestamp-millis columns with AlwaysTrue to
avoid incorrect filter pushdown.
+ // This preserves compound filters (And/Or) so other parts can still be
pushed down.
+ private def replaceTimestampMillisFiltersWithAlwaysTrue(filters: Seq[Filter],
+ avroTableSchema:
Schema,
Review Comment:
same here
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala:
##########
@@ -237,7 +249,11 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
isCaseSensitive,
datetimeRebaseSpec)
}
- filters.map(rebuildFilterFromParquet(_, fileSchema,
querySchemaOption.orElse(null)))
+ // Filter out timestamp-millis columns from filter pushdown to avoid
incorrect filtering.
+ // We replace filters on timestamp-millis columns with AlwaysTrue to
preserve compound filters.
+ val filtersToPushDown = replaceTimestampMillisFiltersWithAlwaysTrue(
Review Comment:
we should not be needing this.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -366,10 +365,34 @@ case class HoodieFileIndex(spark: SparkSession,
// threshold (of 100k records)
val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this,
queryReferencedColumns)
+ // Identify timestamp-millis columns from the Avro schema to skip from
filter translation
+ // (even if they're in the index, they may have been indexed before the
fix and should not be used for filtering)
Review Comment:
can we move this to a separate method.
##########
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala:
##########
@@ -234,7 +246,11 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
isCaseSensitive,
datetimeRebaseSpec)
}
- filters.map(rebuildFilterFromParquet(_, fileSchema,
querySchemaOption.orElse(null)))
+ // Filter out timestamp-millis columns from filter pushdown to avoid
incorrect filtering.
+ // We replace filters on timestamp-millis columns with AlwaysTrue to
preserve compound filters.
+ val filtersToPushDown = Spark34LegacyHoodieParquetFileFormat
Review Comment:
this should not be required. we should remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]