codope commented on a change in pull request #5181:
URL: https://github.com/apache/hudi/pull/5181#discussion_r840224721
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
##########
@@ -39,6 +39,35 @@
private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
Collections.unmodifiableMap(initMap());
+ /**
+ * Converts provided microseconds (from epoch) to {@link Instant}
+ */
+ public static Instant microsToInstant(long microsFromEpoch) {
Review comment:
Can we use Avro's TimeConversions for these utils?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1186,6 +1220,74 @@ public static void aggregateColumnStats(IndexedRecord
record, List<Schema.Field>
}
}
+ /**
+ * Given a schema, coerces provided value to instance of {@link
Comparable<?>} such that
+ * it could subsequently used in column stats
+ *
+ * NOTE: This method has to stay compatible with the semantic of
+ * {@link ParquetUtils#readRangeFromParquetMetadata} as they are used
in tandem
+ */
+ private static Comparable<?> coerceToComparable(Schema schema, Object val) {
Review comment:
This can also go in HoodieAvroUtils
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -108,6 +107,97 @@
protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+ /**
+ * Collects {@link HoodieColumnRangeMetadata} for the provided collection of
records, pretending
+ * as if provided records have been persisted w/in given {@code filePath}
+ *
+ * @param records target records to compute column range metadata for
+ * @param targetFields columns (fields) to be collected
+ * @param filePath file path value required for {@link
HoodieColumnRangeMetadata}
+ *
+ * @return map of {@link HoodieColumnRangeMetadata} for each of the provided
target fields for
+ * the collection of provided records
+ */
+ public static Map<String, HoodieColumnRangeMetadata<Comparable>>
collectColumnRangeMetadata(List<IndexedRecord> records,
+
List<Schema.Field> targetFields,
+
String filePath) {
+ // Helper class to calculate column stats
+ class ColumnStats {
Review comment:
Just wanna say that this whole method is much cleaner and readable than
what I originally had in mind. Really good example of clean code. Thanks a lot.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
##########
@@ -15,13 +15,20 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.sql
-import org.apache.spark.sql.types.{DataType, NumericType, StringType}
+import org.apache.spark.sql.types.{DataType, DecimalType, NumericType,
StringType}
// TODO unify w/ DataTypeUtils
object HoodieSparkTypeUtils {
+ /**
+ * Returns whether this DecimalType is wider than `other`. If yes, it means
`other`
+ * can be casted into `this` safely without losing any precision or range.
+ */
+ def isWiderThan(one: DecimalType, another: DecimalType) =
Review comment:
not using this anywhere?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1105,72 +1174,37 @@ public static int getPartitionFileGroupCount(final
MetadataPartitionType partiti
}
/**
- * Accumulates column range metadata for the given field and updates the
column range map.
- *
- * @param field - column for which statistics will be computed
- * @param filePath - data file path
- * @param columnRangeMap - old column range statistics, which will be merged
in this computation
- * @param columnToStats - map of column to map of each stat and its value
- */
- public static void accumulateColumnRanges(Schema.Field field, String
filePath,
- Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
- Map<String, Map<String, Object>>
columnToStats) {
- Map<String, Object> columnStats = columnToStats.get(field.name());
- HoodieColumnRangeMetadata<Comparable> columnRangeMetadata =
HoodieColumnRangeMetadata.create(
- filePath,
- field.name(),
- (Comparable) String.valueOf(columnStats.get(MIN)),
- (Comparable) String.valueOf(columnStats.get(MAX)),
- Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE,
0).toString())
- );
- columnRangeMap.merge(field.name(), columnRangeMetadata,
COLUMN_RANGE_MERGE_FUNCTION);
- }
-
- /**
- * Aggregates column stats for each field.
- *
- * @param record - current record
- * @param fields - fields for which stats will be
aggregated
- * @param columnToStats - map of column to map of each
stat and its value which gets updates in this method
- * @param consistentLogicalTimestampEnabled - flag to deal with logical
timestamp type when getting column value
+ * Does an upcast for {@link BigDecimal} instance to align it with
scale/precision expected by
+ * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
*/
- public static void aggregateColumnStats(IndexedRecord record,
List<Schema.Field> fields,
- Map<String, Map<String, Object>>
columnToStats,
- boolean
consistentLogicalTimestampEnabled) {
- if (!(record instanceof GenericRecord)) {
- throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ public static BigDecimal tryUpcastDecimal(BigDecimal value, final
LogicalTypes.Decimal decimal) {
Review comment:
let's move this to `HoodieAvroUtils`?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
##########
@@ -15,18 +16,23 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.sql
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
/**
* Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
*/
object HoodieUnsafeRDDUtils {
+ // TODO scala-doc
+ def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType:
StructType): DataFrame =
Review comment:
looks like it's being used only in tests, probably move it closer to
them?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -18,10 +18,29 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Conversions;
Review comment:
nit: shall we keep import rearrangements out of the diff? i tink the
convention is to keep hudi imports at top.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -108,6 +107,97 @@
protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+ /**
+ * Collects {@link HoodieColumnRangeMetadata} for the provided collection of
records, pretending
+ * as if provided records have been persisted w/in given {@code filePath}
+ *
+ * @param records target records to compute column range metadata for
+ * @param targetFields columns (fields) to be collected
+ * @param filePath file path value required for {@link
HoodieColumnRangeMetadata}
+ *
+ * @return map of {@link HoodieColumnRangeMetadata} for each of the provided
target fields for
+ * the collection of provided records
+ */
+ public static Map<String, HoodieColumnRangeMetadata<Comparable>>
collectColumnRangeMetadata(List<IndexedRecord> records,
+
List<Schema.Field> targetFields,
+
String filePath) {
+ // Helper class to calculate column stats
+ class ColumnStats {
+ Object minValue;
+ Object maxValue;
+ long nullCount;
+ long valueCount;
+ }
+
+ HashMap<String, ColumnStats> allColumnStats = new HashMap<>();
+
+ // Collect stats for all columns by iterating through records while
accounting
+ // corresponding stats
+ records.forEach((record) -> {
+ // For each column (field) we have to index update corresponding column
stats
+ // with the values from this record
+ targetFields.forEach(field -> {
+ ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(),
(ignored) -> new ColumnStats());
+
+ GenericRecord genericRecord = (GenericRecord) record;
+
+ final Object fieldVal =
convertValueForSpecificDataTypes(field.schema(),
genericRecord.get(field.name()), true);
+ final Schema fieldSchema =
getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
+
+ if (fieldVal != null && canCompare(fieldSchema)) {
+ // Set the min value of the field
+ if (colStats.minValue == null
+ || ConvertingGenericData.INSTANCE.compare(fieldVal,
colStats.minValue, fieldSchema) < 0) {
+ colStats.minValue = fieldVal;
+ }
+
+ // Set the max value of the field
+ if (colStats.maxValue == null ||
ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue,
fieldSchema) > 0) {
+ colStats.maxValue = fieldVal;
+ }
+
+ colStats.valueCount++;
+ } else {
+ colStats.nullCount++;
+ }
+ });
+ });
+
+ Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String,
HoodieColumnRangeMetadata<Comparable>>> collector =
+ Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(),
Function.identity());
+
+ return (Map<String, HoodieColumnRangeMetadata<Comparable>>)
targetFields.stream()
+ .map(field -> {
+ ColumnStats colStats = allColumnStats.get(field.name());
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ filePath,
+ field.name(),
+ coerceToComparable(field.schema(), colStats.minValue),
+ coerceToComparable(field.schema(), colStats.maxValue),
+ colStats.nullCount,
+ colStats.valueCount,
+ 0,
Review comment:
let's make a note here why we keep it as 0.
eventually, we'll have to revisit this and probably write our own calculator
based on ecoding, type and order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]