mcvsubbu commented on a change in pull request #5597: URL: https://github.com/apache/incubator-pinot/pull/5597#discussion_r443152690
########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.core.data.function.FunctionEvaluator; +import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * Utility methods for extracting source and destination fields from ingestion configs + */ +public class IngestionUtils { + + /** + * Extracts all fields required by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given TableConfig and Schema + */ + public static Set<String> getFieldsForRecordExtractor(TableConfig tableConfig, Schema schema) { Review comment: Consider making two methods below public, for readability. getFieldsToFilterOn() getFieldsToExtract() Or, keep one in the schema and the other here, but that may lead callers to ignore this one. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java ########## @@ -245,7 +245,8 @@ public void run() { if (consumedRow != null) { try { GenericRow transformedRow = _recordTransformer.transform(consumedRow); - if (transformedRow != null) { + // FIXME: MULTIPLE_RECORDS_KEY is not handled here Review comment: On this note, can you add some comments on GenericRow as to how MULTIPLE_RECORDS_KEY is used for (in GenericRow class) and also document it in the decoders description in the documentation? It seems to be used only in test files now, and there is no doc on what it means. Thanks. It looks like the MULTIPLE stuff is a decoder feature. So, when you add a FIXME like this, does this mean that the decoders using multiple recorsd key will break? Or, will they simply not be able to use filters? When is this planned to be fixed? Is this just an interim state from which we should take care not to cut a release? Is this work in progress? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -471,7 +471,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow); - if (transformedRow != null) { + if (transformedRow != null && IngestionUtils.passedFilter(transformedRow)) { Review comment: ```suggestion if (transformedRow != null && IngestionUtils.isRowPruned(transformedRow)) { ``` Or, `IngestionUtils.shouldIngestRow()`? Just a suggestion, you decide ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -1185,12 +1185,12 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, .setConsumerDir(consumerDir); // Create message decoder - _messageDecoder = - StreamDecoderProvider.create(_partitionLevelStreamConfig, SchemaUtils.extractSourceFields(_schema)); + _messageDecoder = StreamDecoderProvider + .create(_partitionLevelStreamConfig, IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema)); Review comment: Same comment as before ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java ########## @@ -169,9 +169,9 @@ public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata realtimeSegm // create and init stream level consumer StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName(); - _streamLevelConsumer = streamConsumerFactory - .createStreamLevelConsumer(clientId, _tableNameWithType, SchemaUtils.extractSourceFields(schema), - instanceMetadata.getGroupId(_tableNameWithType)); + _streamLevelConsumer = streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType, + IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema), Review comment: I think I have the answer for my question. I think this is because we may have filter expressions on columns that are not present in the schema. If this explanation is right, can you please document it some place? (Perhaps in IngestionUtils class?) Also, it may be worthwhile to make two separate calls, and let the caller decide whether to union the fields or otherwise use them independently. Not sure if there is a use case, but think about it. It certainly makes the code more readable. getFieldsToExtractFromStream(), and getFieldsToFilterOn(). Lastly, is it enough to pass in the IngestionConfig (or, maybe even just the FilterConfig) rather than entire TableConfig? This will keep the discipline of adding any ingestion related item to IngestionConfig than looking some place else in the TableConfig ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java ########## @@ -41,28 +39,6 @@ private static final Logger LOGGER = LoggerFactory.getLogger(SchemaUtils.class); - /** - * Extracts the source fields and destination fields from the schema - * For field specs with a transform expression defined, use the arguments provided to the function - * By default, add the field spec name - * - * TODO: for now, we assume that arguments to transform function are in the source i.e. there's no columns which are derived from transformed columns - */ - public static Set<String> extractSourceFields(Schema schema) { Review comment: Spark/Hadoop jobs that depend on this method may break. Consider deprecating it in stead of removing it. Or, atleast add a backward incompat ########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/FilterConfig.java ########## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.ingestion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/** + * Configs related to filtering records during ingestion + */ +public class FilterConfig extends BaseJsonConfig { + + @JsonPropertyDescription("Filter function string. Filter out records during ingestion, if this evaluates to true") + private final String _filterFunction; Review comment: Shouldn't this be a list of filter functions applied in sequence specified? How do we indicate filter like: 1. Sample x% of the records 2. filter out those with age > 40 Or, 1. Filter out those with age > 40 2. Sample x% of these records. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java ########## @@ -169,9 +169,9 @@ public HLRealtimeSegmentDataManager(final RealtimeSegmentZKMetadata realtimeSegm // create and init stream level consumer StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName(); - _streamLevelConsumer = streamConsumerFactory - .createStreamLevelConsumer(clientId, _tableNameWithType, SchemaUtils.extractSourceFields(schema), - instanceMetadata.getGroupId(_tableNameWithType)); + _streamLevelConsumer = streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType, + IngestionUtils.getFieldsForRecordExtractor(tableConfig, schema), Review comment: High level comment, recording it here so that I dont forget. Intuitively, rows ingested should be indicated by the schema, not by whether the row is filtered or not. So, I am not sure what this change implies. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
