Jackie-Jiang commented on a change in pull request #5238: Evaluate schema
transform expressions during ingestion
URL: https://github.com/apache/incubator-pinot/pull/5238#discussion_r407799267
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
##########
@@ -34,33 +33,31 @@
* regular column for other record transformers.
*/
public class ExpressionTransformer implements RecordTransformer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ExpressionTransformer.class);
- private final Map<String, FunctionExpressionEvaluator> _expressionEvaluators
= new HashMap<>();
+ private final Map<String, ExpressionEvaluator> _expressionEvaluators = new
HashMap<>();
+
+ private final String _timeColumn;
public ExpressionTransformer(Schema schema) {
+ _timeColumn = schema.getTimeColumnName();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- String expression = fieldSpec.getTransformFunction();
- if (expression != null) {
- try {
- _expressionEvaluators.put(fieldSpec.getName(), new
FunctionExpressionEvaluator(expression));
- } catch (Exception e) {
- LOGGER.error("Caught exception while constructing expression
evaluator for: {}, skipping", expression, e);
- }
- }
+ ExpressionEvaluator expressionEvaluator =
ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+ if (expressionEvaluator != null) {
+ _expressionEvaluators.put(fieldSpec.getName(), expressionEvaluator);
}
}
}
@Override
public GenericRow transform(GenericRow record) {
- for (Map.Entry<String, FunctionExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
+ for (Map.Entry<String, ExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
String column = entry.getKey();
- // Skip transformation if column value already exist
+ ExpressionEvaluator transformExpressionEvaluator = entry.getValue();
+ // Skip transformation if column value already exist. Value can exist
for time transformation (incoming name = outgoing name)
// NOTE: column value might already exist for OFFLINE data
- if (record.getValue(column) == null) {
- record.putValue(column, entry.getValue().evaluate(record));
+ if (record.getValue(column) == null || column.equals(_timeColumn)) {
Review comment:
Don't specialize time column here. When the dest column already exists, we
should not evaluate the expression again.
The reason for this is that: for hybrid table, both realtime and offline
side share the same schema, and usually the batch ingestion already
pre-processed the data and finished all the expression evaluation.
Here comes a limitation of maintaining this behavior: the source column name
must be different from the dest column name, but I think this should be fine.
We can also add a validation for that in the `SourceFieldNameExtractor`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]