npawar commented on a change in pull request #5238: Evaluate schema transform
expressions during ingestion
URL: https://github.com/apache/incubator-pinot/pull/5238#discussion_r407826663
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
##########
@@ -34,33 +33,31 @@
* regular column for other record transformers.
*/
public class ExpressionTransformer implements RecordTransformer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ExpressionTransformer.class);
- private final Map<String, FunctionExpressionEvaluator> _expressionEvaluators
= new HashMap<>();
+ private final Map<String, ExpressionEvaluator> _expressionEvaluators = new
HashMap<>();
+
+ private final String _timeColumn;
public ExpressionTransformer(Schema schema) {
+ _timeColumn = schema.getTimeColumnName();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- String expression = fieldSpec.getTransformFunction();
- if (expression != null) {
- try {
- _expressionEvaluators.put(fieldSpec.getName(), new
FunctionExpressionEvaluator(expression));
- } catch (Exception e) {
- LOGGER.error("Caught exception while constructing expression
evaluator for: {}, skipping", expression, e);
- }
- }
+ ExpressionEvaluator expressionEvaluator =
ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+ if (expressionEvaluator != null) {
+ _expressionEvaluators.put(fieldSpec.getName(), expressionEvaluator);
}
}
}
@Override
public GenericRow transform(GenericRow record) {
- for (Map.Entry<String, FunctionExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
+ for (Map.Entry<String, ExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
String column = entry.getKey();
- // Skip transformation if column value already exist
+ ExpressionEvaluator transformExpressionEvaluator = entry.getValue();
+ // Skip transformation if column value already exist. Value can exist
for time transformation (incoming name = outgoing name)
// NOTE: column value might already exist for OFFLINE data
- if (record.getValue(column) == null) {
- record.putValue(column, entry.getValue().evaluate(record));
+ if (record.getValue(column) == null || column.equals(_timeColumn)) {
Review comment:
I added that precisely to handle the case you mentioned - incoming and
outgoing spec are different, but have the same name. If you're saying that we
don't need to handle that case, then I can remove this
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]