hudi-agent commented on code in PR #18809:
URL: https://github.com/apache/hudi/pull/18809#discussion_r3284581492
##########
hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java:
##########
@@ -252,21 +256,115 @@ public TimestampData readTimestamp() {
}
}
+ /**
+ * Reader for Parquet INT64 timestamp values (MILLIS / MICROS / NANOS), i.e.
the standard
+ * timestamp encoding defined by Parquet's
+ * {@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and the
legacy
+ * {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS} annotations.
+ * (The older INT96 encoding is marked deprecated by the Parquet format spec
— see
+ * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp">
+ * LogicalTypes.md</a> — but is still supported here via {@link
TypesFromInt96PageReader} for
+ * backwards compatibility with files written by older Hive / Spark / Impala
versions.)
+ *
+ * <p>Used by {@link NestedPrimitiveColumnReader} when a TIMESTAMP column
sits inside a
+ * {@code Row}, {@code Array} or {@code Map}; the top-level path continues
to use
+ * {@link Int64TimestampColumnReader} for batched-vector efficiency.
+ */
+ public static class TypesFromInt64PageReader extends
DefaultParquetDataColumnReader {
+ private final boolean isUtcTimestamp;
+ private final ChronoUnit chronoUnit;
+
+ public TypesFromInt64PageReader(
+ ValuesReader realReader, boolean isUtcTimestamp, ChronoUnit
chronoUnit) {
+ super(realReader);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ public TypesFromInt64PageReader(
+ Dictionary dict, boolean isUtcTimestamp, ChronoUnit chronoUnit) {
+ super(dict);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ @Override
+ public TimestampData readTimestamp() {
+ return int64ToTimestamp(isUtcTimestamp, valuesReader.readLong(),
chronoUnit);
+ }
+
+ @Override
+ public TimestampData readTimestamp(int id) {
+ return int64ToTimestamp(isUtcTimestamp, dict.decodeToLong(id),
chronoUnit);
+ }
+ }
+
private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
boolean isDictionary,
PrimitiveType parquetType,
Dictionary dictionary,
ValuesReader valuesReader,
boolean isUtcTimestamp) {
- if (parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT96) {
+ PrimitiveType.PrimitiveTypeName typeName =
parquetType.getPrimitiveTypeName();
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT96) {
return isDictionary
? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
: new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
- } else {
- return isDictionary
- ? new DefaultParquetDataColumnReader(dictionary)
- : new DefaultParquetDataColumnReader(valuesReader);
}
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT64) {
+ ChronoUnit unit = resolveInt64TimestampUnit(parquetType);
+ if (unit != null) {
+ return isDictionary
+ ? new TypesFromInt64PageReader(dictionary, isUtcTimestamp, unit)
+ : new TypesFromInt64PageReader(valuesReader, isUtcTimestamp, unit);
+ }
+ }
+ return isDictionary
+ ? new DefaultParquetDataColumnReader(dictionary)
+ : new DefaultParquetDataColumnReader(valuesReader);
+ }
+
+ /**
+ * Returns the {@link ChronoUnit} for a Parquet INT64 TIMESTAMP column, or
{@code null} if the
+ * column is a plain INT64 (not a timestamp).
+ *
+ * <p>Supports both the modern {@link
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and
+ * the legacy {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS}
+ * encodings.
+ */
+ private static ChronoUnit resolveInt64TimestampUnit(PrimitiveType
parquetType) {
+ LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
+ if (annotation instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit unit =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
annotation).getUnit();
+ switch (unit) {
+ case MILLIS:
+ return ChronoUnit.MILLIS;
+ case MICROS:
+ return ChronoUnit.MICROS;
+ case NANOS:
+ return ChronoUnit.NANOS;
+ default:
+ return null;
+ }
+ }
+ OriginalType originalType = parquetType.getOriginalType();
+ if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+ return ChronoUnit.MILLIS;
+ }
+ if (originalType == OriginalType.TIMESTAMP_MICROS) {
+ return ChronoUnit.MICROS;
+ }
+ return null;
+ }
Review Comment:
🤖 nit: could you rename `utcTimestamp` to `isUtcTimestamp` here? Every other
use of this flag in the class — the field declaration, both constructor
parameters, and the `getDataColumnReaderByTypeHelper` parameter — uses the `is`
prefix, so dropping it in this private helper is a small inconsistency that may
trip up a future reader.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/table/format/cow/utils/NestedPositionUtil.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.utils;
+
+import org.apache.hudi.table.format.cow.vector.position.CollectionPosition;
+import org.apache.hudi.table.format.cow.vector.position.RowPosition;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
+
+import static java.lang.String.format;
+
+/**
+ * Utils to calculate nested type position.
+ *
+ * <p>Note: Vendored from Apache Flink (FLINK-35702, {@code
+ * org.apache.flink.formats.parquet.utils.NestedPositionUtil}).
+ */
+public class NestedPositionUtil {
+
+ /**
+ * Calculate row offsets according to column's max repetition level,
definition level, value's
+ * repetition level and definition level. Each row has three situation:
+ * <li>Row is not defined,because it's optional parent fields is null, this
is decided by its
+ * parent's repetition level
+ * <li>Row is null
+ * <li>Row is defined and not empty.
+ *
+ * @param field field that contains the row column message include max
repetition level and
+ * definition level.
+ * @param fieldRepetitionLevels int array with each value's repetition level.
+ * @param fieldDefinitionLevels int array with each value's definition level.
+ * @return {@link RowPosition} contains collections row count and isNull
array.
+ */
+ public static RowPosition calculateRowOffsets(
+ ParquetField field, int[] fieldDefinitionLevels, int[]
fieldRepetitionLevels) {
+ int rowDefinitionLevel = field.getDefinitionLevel();
+ int rowRepetitionLevel = field.getRepetitionLevel();
+ int nullValuesCount = 0;
+ BooleanArrayList nullRowFlags = new BooleanArrayList(0);
+ for (int i = 0; i < fieldDefinitionLevels.length; i++) {
+ // If a row's last field is an array, the repetition levels for the
array's items will
+ // be larger than the parent row's repetition level, so we need to skip
those values.
+ if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
+ continue;
+ }
+
+ if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
+ // current row is defined and not empty
+ nullRowFlags.add(false);
+ } else {
+ // current row is null
+ nullRowFlags.add(true);
+ nullValuesCount++;
+ }
+ }
+ if (nullValuesCount == 0) {
+ return new RowPosition(null, fieldDefinitionLevels.length);
+ }
+ return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
+ }
+
+ /**
+ * Calculate the collection's offsets according to column's max repetition
level, definition
+ * level, value's repetition level and definition level. Each collection
(Array or Map) has four
+ * situation:
+ * <li>Collection is not defined, because optional parent fields is null,
this is decided by its
+ * parent's repetition level
+ * <li>Collection is null
+ * <li>Collection is defined but empty
+ * <li>Collection is defined and not empty. In this case offset value is
increased by the number
+ * of elements in that collection
+ *
+ * @param field field that contains array/map column message include max
repetition level and
+ * definition level.
+ * @param definitionLevels int array with each value's repetition level.
+ * @param repetitionLevels int array with each value's definition level.
Review Comment:
🤖 nit: the descriptions for `definitionLevels` and `repetitionLevels` look
swapped here — `definitionLevels` is described as "repetition level" and
`repetitionLevels` as "definition level". Could you flip the descriptions so
they match their param names?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]