Jackie-Jiang commented on code in PR #17308:
URL: https://github.com/apache/pinot/pull/17308#discussion_r2615967861
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerUtils.java:
##########
@@ -89,6 +92,35 @@ public static List<RecordTransformer>
getDefaultTransformers(TableConfig tableCo
return getTransformers(tableConfig, schema, false, false, false, false);
}
+ /**
+ * Returns transformers to apply after a partial upsert merge. Only
post-merge transform configs are honored to avoid
+ * re-running ingestion-time transforms.
+ *
+ * @param tableConfig The table configuration containing post-update
transform configs
+ * @param schema The table schema used for validation and type conversion
+ * @return List of transformers to apply after merge, or empty list if none
configured
+ */
+ public static List<RecordTransformer>
getPostPartialUpsertTransformers(TableConfig tableConfig, Schema schema) {
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig == null) {
+ return List.of();
+ }
+ List<TransformConfig> postUpdateTransformConfigs =
upsertConfig.getPostPartialUpsertTransformConfigs();
+ if (CollectionUtils.isEmpty(postUpdateTransformConfigs)) {
+ return List.of();
+ }
+ List<RecordTransformer> transformers = new ArrayList<>();
+ addIfNotNoOp(transformers,
+ new ExpressionTransformer(tableConfig, schema,
postUpdateTransformConfigs,
+ false /* includeFieldSpecTransforms */, true /*
overwriteExistingValues */));
+ addIfNotNoOp(transformers, new DataTypeTransformer(tableConfig, schema));
Review Comment:
Ideally we don't want to re-apply transformers to all columns. We can
address this in the future and add a TODO for now
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -140,13 +158,18 @@ public void transform(GenericRow record) {
String column = entry.getKey();
FunctionEvaluator transformFunctionEvaluator = entry.getValue();
Object existingValue = record.getValue(column);
- if (existingValue == null) {
+ boolean shouldApplyTransform = _overwriteExistingValues || existingValue
== null || record.isNullValue(column);
+ if (shouldApplyTransform) {
try {
// Skip transformation if column value already exists
// NOTE: column value might already exist for OFFLINE data,
// For backward compatibility, The only exception here is that we
will override nested field like array,
// collection or map since they were not included in the record
transformation before.
- record.putValue(column, transformFunctionEvaluator.evaluate(record));
+ Object transformedValue =
transformFunctionEvaluator.evaluate(record);
+ if (transformedValue != null) {
+ record.removeNullValueField(column);
+ }
+ record.putValue(column, transformedValue);
Review Comment:
Do you want to remove value instead of put value when it is `null`?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -54,24 +55,40 @@ public class ExpressionTransformer implements
RecordTransformer {
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new
LinkedHashMap<>();
private final boolean _continueOnError;
private final ThrottledLogger _throttledLogger;
+ /**
+ * When true, transforms run even if the column already has a non-null
value. This is used for post-upsert transforms
+ * where derived columns should be recomputed on the merged row.
+ */
+ private final boolean _overwriteExistingValues;
public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
+ this(tableConfig, schema, null, true, false);
+ }
+
+ public ExpressionTransformer(TableConfig tableConfig, Schema schema,
Review Comment:
This is hard to understand. Can we add a constructor with only
`List<TransformConfig> transformConfigs, boolean overwriteExistingValues`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]