voonhous commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3467022363
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +420,104 @@ public static Schema
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
}
}
+
+ /**
+ * Returns the column handles that must be present in the read schema for
the file group reader to merge
+ * correctly: the ordering columns, plus the mandatory merge columns
declared by a configured custom record
+ * merger (via {@link HoodieRecordMerger#getMandatoryFieldsForMerging}).
+ * <p>
+ * For COMMIT_TIME / EVENT_TIME tables this is exactly the ordering
columns (so behavior is unchanged). For a
+ * CUSTOM merge mode with a registered merger, it additionally includes
any data columns the merger reads at
+ * merge time (e.g. an arbitrary decision column) so that those columns
are read from the base file even when
+ * the query does not project them -- without this the merger would see
null for an un-projected column.
+ */
+ public static List<HiveColumnHandle> getMergeRequiredColumnHandles(
+ Table table,
+ TypeManager typeManager,
+ Lazy<HoodieTableMetaClient> lazyMetaClient,
+ List<String> recordMergerImpls,
+ HiveTimestampPrecision timestampPrecision)
+ {
+ HoodieTableMetaClient metaClient = lazyMetaClient.get();
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
+
+ LinkedHashSet<String> requiredColumnNames = new LinkedHashSet<>();
+ if (recordMergeMode != null && recordMergeMode !=
RecordMergeMode.COMMIT_TIME_ORDERING) {
+ requiredColumnNames.addAll(tableConfig.getOrderingFields());
+ }
+
+ // For a CUSTOM merge mode, ask the configured merger which fields it
needs at merge time and include them
+ // so they are read even when not projected. Only the merger's
declared columns are added (not all columns),
+ // so non-custom tables and mergers that only use the key/ordering
fields incur no extra reads.
+ if (recordMergeMode == RecordMergeMode.CUSTOM && recordMergerImpls !=
null && !recordMergerImpls.isEmpty()) {
+ Option<HoodieRecordMerger> merger =
HoodieRecordUtils.createValidRecordMerger(
+ EngineType.JAVA, String.join(",", recordMergerImpls),
tableConfig.getRecordMergeStrategyId());
+ if (merger.isPresent()) {
+ TypedProperties props = new TypedProperties();
+ props.putAll(tableConfig.getProps());
+ props.setProperty(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
String.join(",", recordMergerImpls));
+ HoodieSchema tableSchema;
+ try {
+ tableSchema = new
TableSchemaResolver(metaClient).getTableSchema();
+ }
+ catch (Exception e) {
+ throw new TrinoException(HUDI_SCHEMA_ERROR, "Failed to
resolve table schema for merge column resolution", e);
+ }
+ String[] mandatoryFields =
merger.get().getMandatoryFieldsForMerging(tableSchema, tableConfig, props);
+ if (mandatoryFields != null) {
+ Collections.addAll(requiredColumnNames, mandatoryFields);
+ }
+ }
+ }
+
+ if (requiredColumnNames.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return buildColumnHandles(table, typeManager, requiredColumnNames,
timestampPrecision);
+ }
+
+ /**
+ * Builds {@link HiveColumnHandle}s, preserving physical (data-column)
index, for the data columns whose names
+ * appear in {@code columnNames}. Names that are not data columns (e.g.
Hudi meta fields) or whose types are not
+ * supported by the storage format are skipped.
+ */
+ private static List<HiveColumnHandle> buildColumnHandles(Table table,
TypeManager typeManager, Set<String> columnNames, HiveTimestampPrecision
timestampPrecision)
+ {
+ ImmutableList.Builder<HiveColumnHandle> columns =
ImmutableList.builder();
+ int hiveColumnIndex = 0;
+ for (Column field : table.getDataColumns()) {
+ // ignore unsupported types rather than failing
+ if (columnNames.contains(field.getName())) {
+ HiveType hiveType = field.getType();
+ if (typeSupported(hiveType.getTypeInfo(),
table.getStorage().getStorageFormat())) {
+ columns.add(createBaseColumn(field.getName(),
hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision),
REGULAR, field.getComment()));
+ }
+ }
+ hiveColumnIndex++;
+ }
+ return columns.build();
+ }
Review Comment:
Done, moved the comment next to the `typeSupported(...)` guard it actually
describes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]