Jackie-Jiang commented on a change in pull request #5238: Evaluate schema
transform expressions during ingestion
URL: https://github.com/apache/incubator-pinot/pull/5238#discussion_r407838787
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
##########
@@ -34,33 +33,31 @@
* regular column for other record transformers.
*/
public class ExpressionTransformer implements RecordTransformer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ExpressionTransformer.class);
- private final Map<String, FunctionExpressionEvaluator> _expressionEvaluators
= new HashMap<>();
+ private final Map<String, ExpressionEvaluator> _expressionEvaluators = new
HashMap<>();
+
+ private final String _timeColumn;
public ExpressionTransformer(Schema schema) {
+ _timeColumn = schema.getTimeColumnName();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- String expression = fieldSpec.getTransformFunction();
- if (expression != null) {
- try {
- _expressionEvaluators.put(fieldSpec.getName(), new
FunctionExpressionEvaluator(expression));
- } catch (Exception e) {
- LOGGER.error("Caught exception while constructing expression
evaluator for: {}, skipping", expression, e);
- }
- }
+ ExpressionEvaluator expressionEvaluator =
ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+ if (expressionEvaluator != null) {
+ _expressionEvaluators.put(fieldSpec.getName(), expressionEvaluator);
}
}
}
@Override
public GenericRow transform(GenericRow record) {
- for (Map.Entry<String, FunctionExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
+ for (Map.Entry<String, ExpressionEvaluator> entry :
_expressionEvaluators.entrySet()) {
String column = entry.getKey();
- // Skip transformation if column value already exist
+ ExpressionEvaluator transformExpressionEvaluator = entry.getValue();
+ // Skip transformation if column value already exist. Value can exist
for time transformation (incoming name = outgoing name)
// NOTE: column value might already exist for OFFLINE data
- if (record.getValue(column) == null) {
- record.putValue(column, entry.getValue().evaluate(record));
+ if (record.getValue(column) == null || column.equals(_timeColumn)) {
Review comment:
What I'm suggesting is to not force transformation for time column because
of the scenario I described.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]