nsivabalan commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2734691887
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -46,6 +46,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
Review Comment:
should we remove this change
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1040,12 +1051,35 @@ private static Object rewritePrimaryType(Object
oldValue, Schema oldSchema, Sche
case NULL:
case BOOLEAN:
case INT:
- case LONG:
case FLOAT:
case DOUBLE:
case BYTES:
case STRING:
return oldValue;
+ case LONG:
+ if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
Review Comment:
from where did you get this fix in?
I don't see it in latest master?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -143,7 +144,7 @@ protected <T> ClosableIterator<HoodieRecord<T>>
deserializeRecords(byte[] conten
checkState(this.readerSchema != null, "Reader's schema has to be
non-null");
checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to
spark record");
// TODO AvroSparkReader need
- RecordIterator iterator = RecordIterator.getInstance(this, content);
+ RecordIterator iterator = RecordIterator.getInstance(this, content, true);
Review Comment:
if we have record context, shouldn't we do
```
RecordIterator.getInstance(this, content,
readerContext.enableLogicalTimestampFieldRepair());
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -143,7 +144,7 @@ protected <T> ClosableIterator<HoodieRecord<T>>
deserializeRecords(byte[] conten
checkState(this.readerSchema != null, "Reader's schema has to be
non-null");
checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to
spark record");
// TODO AvroSparkReader need
- RecordIterator iterator = RecordIterator.getInstance(this, content);
+ RecordIterator iterator = RecordIterator.getInstance(this, content, true);
Review Comment:
may be in 1.x, we are not using this method anymore but the other method.
but for 0.x, lets see how we can wire in the config
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1436,4 +1474,107 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
}
}
+ /**
+ * Checks if a logical type is an instance of LocalTimestampMillis using
reflection.
+ * Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
+ */
+ private static boolean isLocalTimestampMillis(LogicalType logicalType) {
+ if (logicalType == null) {
+ return false;
+ }
+ try {
+ Class<?> localTimestampMillisClass =
Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
Review Comment:
can you confirm that we are calling Class.forName(...) only once per schema
or file group and not per record.
wondering if it will worth adding a JVM cache in some util class where we
can access the availability of these classes.
##########
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala:
##########
@@ -66,6 +78,31 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
.putBoolean(METADATA_COL_ATTR_KEY, value = true)
.build()
+ override def isTimestampNTZType(dataType: DataType): Boolean = {
+ dataType == DataTypes.TimestampNTZType
+ }
+
+ override def getParquetReadSupport(messageSchema:
org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
+ new HoodieParquetReadSupport(
+ Option.empty[ZoneId],
+ enableVectorizedReader = true,
Review Comment:
same comment as Spark3_5Adapter
##########
.github/workflows/bot.yml:
##########
@@ -268,25 +268,33 @@ jobs:
distribution: 'temurin'
architecture: x64
cache: maven
+ - name: Verify Java 17 version
Review Comment:
java 17 is only required for spark4.
why do we need to pull in these changes?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala:
##########
@@ -93,6 +93,8 @@ object HoodieSparkKryoRegistrar {
private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"
def register(conf: SparkConf): SparkConf = {
- conf.set(KRYO_USER_REGISTRATORS,
Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
+ // Use class name directly to avoid Scala collection binary compatibility
issues
+ // when compiled with Scala 2.13 but running with Spark 3.5 (Scala 2.12)
+ conf.set(KRYO_USER_REGISTRATORS, classOf[HoodieSparkKryoRegistrar].getName)
Review Comment:
we don't have support for scala 2.13 in 0.x right. why do we need this
change?
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.avro.Schema;
+
+/**
+ * An avro schema cache implementation for reusing avro schema instantces in
JVM/process scope.
+ * This is a global cache which works for a JVM lifecycle.
+ * A collection of schema instants are maintained.
+ *
+ * <p> NOTE: The schema which be used frequently should be cached through this
cache.
+ */
+public class AvroSchemaCache {
+
+
Review Comment:
why two line breaks?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -68,10 +69,13 @@ public HoodieMergedReadHandle(HoodieWriteConfig config,
Pair<String, String> partitionPathFileIDPair,
Option<FileSlice> fileSliceOption) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
- readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
+ Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
// config.getSchema is not canonicalized, while config.getWriteSchema is
canonicalized. So, we have to use the canonicalized schema to read the existing
data.
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getWriteSchema()),
config.allowOperationMetadataField());
fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption :
getLatestFileSlice();
+ // Repair reader schema.
+ // Assume writer schema should be correct. If not, no repair happens.
+ readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema,
baseFileReaderSchema);
Review Comment:
don't we need to flip the 1st and 2nd argument to this method
(`getRepairedSchema`)
Also, why are we fixing only readerSchema and not `baseFileReaderSchema` ?
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1040,12 +1051,35 @@ private static Object rewritePrimaryType(Object
oldValue, Schema oldSchema, Sche
case NULL:
case BOOLEAN:
case INT:
- case LONG:
case FLOAT:
case DOUBLE:
case BYTES:
case STRING:
return oldValue;
+ case LONG:
+ if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
+ if (oldSchema.getLogicalType() == null ||
newSchema.getLogicalType() == null) {
+ return oldValue;
+ } else if (oldSchema.getLogicalType() instanceof
LogicalTypes.TimestampMillis) {
+ if (newSchema.getLogicalType() instanceof
LogicalTypes.TimestampMicros) {
+ return DateTimeUtils.millisToMicros((Long) oldValue);
+ }
+ } else if (oldSchema.getLogicalType() instanceof
LogicalTypes.TimestampMicros) {
+ if (newSchema.getLogicalType() instanceof
LogicalTypes.TimestampMillis) {
+ return DateTimeUtils.microsToMillis((Long) oldValue);
+ }
+ } else if (isLocalTimestampMillis(oldSchema.getLogicalType())) {
+ if (isLocalTimestampMicros(newSchema.getLogicalType())) {
+ return DateTimeUtils.millisToMicros((Long) oldValue);
Review Comment:
is this evolution allowed?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -637,6 +637,19 @@ public final List<StoragePath> getPartitionPaths() {
}
}
+ public final List<String> getPartitionNames() {
Review Comment:
why do we need this?
not related to logical ts fixes right.
##########
hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java:
##########
@@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData {
private static final TimeConversions.TimeMicrosConversion
TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
private static final TimeConversions.TimestampMicrosConversion
TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
- // NOTE: Those are not supported in Avro 1.8.2
- // TODO re-enable upon upgrading to 1.10
- // private static final TimeConversions.TimestampMillisConversion
TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
- // private static final TimeConversions.TimeMillisConversion
TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
- // private static final TimeConversions.LocalTimestampMillisConversion
LOCAL_TIMESTAMP_MILLIS_CONVERSION = new
TimeConversions.LocalTimestampMillisConversion();
- // private static final TimeConversions.LocalTimestampMicrosConversion
LOCAL_TIMESTAMP_MICROS_CONVERSION = new
TimeConversions.LocalTimestampMicrosConversion();
-
+ // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
+ // Use reflection to conditionally initialize them only if available
+ private static final Object TIMESTAMP_MILLIS_CONVERSION =
createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimestampMillisConversion");
Review Comment:
looks like we still support 2.x in 0.x release lines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]