jadami10 commented on code in PR #16254:
URL: https://github.com/apache/pinot/pull/16254#discussion_r2854651826


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java:
##########
@@ -18,176 +18,142 @@
  */
 package org.apache.pinot.segment.local.segment.creator;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
-import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.recordtransformer.FilterTransformer;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformerUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * The class for transforming validating GenericRow data against table schema 
and table config.
  * It is used mainly but not limited by RealTimeDataManager for each row that 
is going to be indexed into Pinot.
  */
 public class TransformPipeline {
-  private final List<RecordTransformer> _preComplexTypeTransformers;
-  private final ComplexTypeTransformer _complexTypeTransformer;
-  private final RecordTransformer _recordTransformer;
-
-  /**
-   * Constructs a transform pipeline with customized RecordTransformer and 
customized ComplexTypeTransformer
-   * @param preComplexTypeTransformers the list of customized pre-complex type 
transformers
-   * @param complexTypeTransformer the customized complexType transformer
-   * @param recordTransformer the customized record transformer
-   */
-  public TransformPipeline(@Nullable List<RecordTransformer> 
preComplexTypeTransformers,
-      @Nullable ComplexTypeTransformer complexTypeTransformer, 
RecordTransformer recordTransformer) {
-    _preComplexTypeTransformers = preComplexTypeTransformers;
-    _complexTypeTransformer = complexTypeTransformer;
-    _recordTransformer = recordTransformer;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TransformPipeline.class);
+
+  private final String _tableNameWithType;
+  private final List<RecordTransformer> _transformers;
+  @Nullable
+  private final FilterTransformer _filterTransformer;
+
+  private long _numRowsProcessed;
+  private long _numRowsFiltered;
+  private long _numRowsIncomplete;
+  private long _numRowsSanitized;
+
+  public TransformPipeline(String tableNameWithType, List<RecordTransformer> 
transformers) {
+    _tableNameWithType = tableNameWithType;
+    _transformers = transformers;
+    FilterTransformer filterTransformer = null;
+    for (RecordTransformer recordTransformer : transformers) {
+      if (recordTransformer instanceof FilterTransformer) {
+        filterTransformer = (FilterTransformer) recordTransformer;
+        break;
+      }
+    }
+    _filterTransformer = filterTransformer;
   }
 
-  /**
-   * Constructs a transform pipeline based on TableConfig and table schema.
-   * @param tableConfig the config for the table
-   * @param schema the table schema
-   */
   public TransformPipeline(TableConfig tableConfig, Schema schema) {
-    // Create pre complex type transformers
-    _preComplexTypeTransformers = 
CompositeTransformer.getPreComplexTypeTransformers(tableConfig);
-
-    // Create complex type transformer
-    _complexTypeTransformer = 
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
-
-    // Create record transformer
-    _recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
+    this(tableConfig.getTableName(), 
RecordTransformerUtils.getDefaultTransformers(tableConfig, schema));
   }
 
-  /**
-   * Returns a pass through pipeline that does not transform the record.
-   */
-  public static TransformPipeline getPassThroughPipeline() {
-    return new TransformPipeline(null, null, 
CompositeTransformer.getPassThroughTransformer());
+  public static TransformPipeline getPassThroughPipeline(String 
tableNameWithType) {
+    return new TransformPipeline(tableNameWithType, List.of());
   }
 
-  public Collection<String> getInputColumns() {
-    if (_preComplexTypeTransformers == null && _complexTypeTransformer == 
null) {
-      return _recordTransformer.getInputColumns();
-    }
-    Set<String> inputColumns = new 
HashSet<>(_recordTransformer.getInputColumns());
-    if (_preComplexTypeTransformers != null) {
-      for (RecordTransformer preComplexTypeTransformer : 
_preComplexTypeTransformers) {
-        inputColumns.addAll(preComplexTypeTransformer.getInputColumns());
-      }
-    }
-    if (_complexTypeTransformer != null) {
-      inputColumns.addAll(_complexTypeTransformer.getInputColumns());
+  public Set<String> getInputColumns() {
+    Set<String> inputColumns = new HashSet<>();
+    for (RecordTransformer transformer : _transformers) {
+      inputColumns.addAll(transformer.getInputColumns());
     }
     return inputColumns;
   }
 
-  /**
-   * Process and validate the decoded row against schema.
-   * @param decodedRow the row data to pass in
-   * @param reusedResult the reused result so we can reduce objects created 
for each row
-   * @throws Exception when data has issues like schema validation. Fetch the 
partialResult from Exception
-   */
-  public void processRow(GenericRow decodedRow, Result reusedResult)
-      throws Exception {
-    reusedResult.reset();
-
-    if (_preComplexTypeTransformers != null) {
-      for (RecordTransformer preComplexTypeTransformer : 
_preComplexTypeTransformers) {
-        decodedRow = preComplexTypeTransformer.transform(decodedRow);
+  public Result processRow(GenericRow decodedRow) {
+    if (Boolean.TRUE.equals(decodedRow.getValue(GenericRow.SKIP_RECORD_KEY))) {
+      return new Result(List.of(), 0, 0, 0);
+    }
+    //noinspection unchecked
+    List<GenericRow> rows = (List<GenericRow>) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    if (rows == null) {
+      rows = List.of(decodedRow);
+    }
+    _numRowsProcessed += rows.size();
+    for (RecordTransformer transformer : _transformers) {
+      rows = transformer.transform(rows);
+    }
+    int skippedRowCount = 0;
+    if (_filterTransformer != null) {
+      long numRowsFiltered = _filterTransformer.getNumRecordsFiltered();
+      skippedRowCount = (int) (numRowsFiltered - _numRowsFiltered);
+      _numRowsFiltered = numRowsFiltered;
+    }
+    int incompleteRowCount = 0;
+    int sanitizedRowCount = 0;
+    for (GenericRow record : rows) {
+      if (record.isIncomplete()) {
+        incompleteRowCount++;
+        _numRowsIncomplete++;
       }
-    }
-
-    if (_complexTypeTransformer != null) {
-      // TODO: consolidate complex type transformer into composite type 
transformer
-      decodedRow = _complexTypeTransformer.transform(decodedRow);
-    }
-
-    Collection<GenericRow> rows = (Collection<GenericRow>) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
-
-    if (CollectionUtils.isNotEmpty(rows)) {
-      for (GenericRow row : rows) {
-        processPlainRow(row, reusedResult);
+      if (record.isSanitized()) {
+        sanitizedRowCount++;
+        _numRowsSanitized++;
       }
-    } else {
-      decodedRow.removeValue(GenericRow.MULTIPLE_RECORDS_KEY);
-      processPlainRow(decodedRow, reusedResult);
     }
+    return new Result(rows, skippedRowCount, incompleteRowCount, 
sanitizedRowCount);

Review Comment:
   this is also a bug, `skippedRowCount` and `incompleteRowCount` are swapped. 
can fix this in the same PR we address the comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to