This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ac89203b Use TransformPipeline in SegmentMapper (#14425)
a6ac89203b is described below

commit a6ac89203b70f3ac8d4967689cb785239880ca6a
Author: Shounak kulkarni <[email protected]>
AuthorDate: Tue Nov 19 15:17:12 2024 +0530

    Use TransformPipeline in SegmentMapper (#14425)
    
    * add a constructor to pass custom transform pipelines
    
    * Use TransformPipeline for all transformations
    
    * validations already done in transformPipeline
    
    * Allow passing custom TransformPipeline to SegmentProcessorFramework
---
 .../framework/SegmentProcessorFramework.java       | 31 ++++++++++-
 .../segment/processing/mapper/SegmentMapper.java   | 65 ++++++++--------------
 2 files changed, 51 insertions(+), 45 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 4b166b934a..4c9081a1ee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -63,6 +63,7 @@ public class SegmentProcessorFramework {
 
   private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final List<RecordTransformer> _customRecordTransformers;
+  private final TransformPipeline _transformPipeline;
   private final SegmentProcessorConfig _segmentProcessorConfig;
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
@@ -87,12 +88,28 @@ public class SegmentProcessorFramework {
       List<RecordReaderFileConfig> recordReaderFileConfigs, 
List<RecordTransformer> customRecordTransformers,
       SegmentNumRowProvider segmentNumRowProvider)
       throws IOException {
+    this(segmentProcessorConfig, workingDir, recordReaderFileConfigs, 
customRecordTransformers, null,
+        segmentNumRowProvider);
+  }
+
+  public SegmentProcessorFramework(SegmentProcessorConfig 
segmentProcessorConfig, File workingDir,
+      List<RecordReaderFileConfig> recordReaderFileConfigs, TransformPipeline 
transformPipeline,
+      SegmentNumRowProvider segmentNumRowProvider)
+      throws IOException {
+    this(segmentProcessorConfig, workingDir, recordReaderFileConfigs, null, 
transformPipeline, segmentNumRowProvider);
+  }
+
+  protected SegmentProcessorFramework(SegmentProcessorConfig 
segmentProcessorConfig, File workingDir,
+      List<RecordReaderFileConfig> recordReaderFileConfigs, 
List<RecordTransformer> customRecordTransformers,
+      TransformPipeline transformPipeline, SegmentNumRowProvider 
segmentNumRowProvider)
+      throws IOException {
 
     Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No 
recordReaderFileConfigs provided");
     LOGGER.info("Initializing SegmentProcessorFramework with {} record 
readers, config: {}, working dir: {}",
         recordReaderFileConfigs.size(), segmentProcessorConfig, 
workingDir.getAbsolutePath());
     _recordReaderFileConfigs = recordReaderFileConfigs;
     _customRecordTransformers = customRecordTransformers;
+    _transformPipeline = transformPipeline;
 
     _segmentProcessorConfig = segmentProcessorConfig;
 
@@ -147,9 +164,8 @@ public class SegmentProcessorFramework {
 
     while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
       // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
-      SegmentMapper mapper =
-          new 
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
 numRecordReaders),
-              _customRecordTransformers, _segmentProcessorConfig, 
_mapperOutputDir);
+      SegmentMapper mapper = getSegmentMapper(
+          _recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, 
numRecordReaders));
 
       // Log start of iteration details only if intermediate file size 
threshold is set.
       if (isMapperOutputSizeThresholdEnabled) {
@@ -217,6 +233,15 @@ public class SegmentProcessorFramework {
     return outputSegmentDirs;
   }
 
+  protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig> 
recordReaderFileConfigs) {
+    if (_transformPipeline != null) {
+      return new SegmentMapper(recordReaderFileConfigs, _transformPipeline, 
_segmentProcessorConfig, _mapperOutputDir);
+    } else {
+      return new SegmentMapper(recordReaderFileConfigs, 
_customRecordTransformers, _segmentProcessorConfig,
+          _mapperOutputDir);
+    }
+  }
+
   private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
     for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
       RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 84377b9021..49dfc42431 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.segment.processing.mapper;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -42,7 +41,7 @@ import 
org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
-import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -71,21 +70,28 @@ public class SegmentMapper {
   private final boolean _includeNullFields;
   private final int _numSortFields;
   private final RecordEnricherPipeline _recordEnricherPipeline;
-  private final CompositeTransformer _recordTransformer;
-  private final ComplexTypeTransformer _complexTypeTransformer;
+  private final TransformPipeline _transformPipeline;
   private final TimeHandler _timeHandler;
   private final Partitioner[] _partitioners;
   private final String[] _partitionsBuffer;
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
-  private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
-  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
-  private List<RecordTransformer> _customRecordTransformers;
+  private final AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
+  private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
 
   public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
       List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig 
processorConfig, File mapperOutputDir) {
+    this(recordReaderFileConfigs,
+        new TransformPipeline(
+            
CompositeTransformer.composeAllTransformers(customRecordTransformers, 
processorConfig.getTableConfig(),
+                processorConfig.getSchema()),
+            
ComplexTypeTransformer.getComplexTypeTransformer(processorConfig.getTableConfig())),
+        processorConfig, mapperOutputDir);
+  }
+
+  public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs, 
TransformPipeline transformPipeline,
+      SegmentProcessorConfig processorConfig, File mapperOutputDir) {
     _recordReaderFileConfigs = recordReaderFileConfigs;
-    _customRecordTransformers = customRecordTransformers;
     _processorConfig = processorConfig;
     _mapperOutputDir = mapperOutputDir;
 
@@ -97,9 +103,8 @@ public class SegmentMapper {
     _numSortFields = pair.getRight();
     _includeNullFields =
         schema.isEnableColumnBasedNullHandling() || 
tableConfig.getIndexingConfig().isNullHandlingEnabled();
-    _recordEnricherPipeline = 
RecordEnricherPipeline.fromTableConfig(tableConfig);
-    _recordTransformer = 
CompositeTransformer.composeAllTransformers(_customRecordTransformers, 
tableConfig, schema);
-    _complexTypeTransformer = 
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
+    _recordEnricherPipeline = 
RecordEnricherPipeline.fromTableConfig(processorConfig.getTableConfig());
+    _transformPipeline = transformPipeline;
     _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
     List<PartitionerConfig> partitionerConfigs = 
processorConfig.getPartitionerConfigs();
     int numPartitioners = partitionerConfigs.size();
@@ -168,9 +173,11 @@ public class SegmentMapper {
 
 //   Returns true if the map phase can continue, false if it should terminate 
based on the configured threshold for
 //   intermediate file size during map phase.
-  private boolean completeMapAndTransformRow(RecordReader recordReader, 
GenericRow reuse,
+  protected boolean completeMapAndTransformRow(RecordReader recordReader, 
GenericRow reuse,
       Consumer<Object> observer, int count, int totalCount) throws Exception {
     observer.accept(String.format("Doing map phase on data from RecordReader 
(%d out of %d)", count, totalCount));
+
+    TransformPipeline.Result reusedResult = new TransformPipeline.Result();
     boolean continueOnError =
         _processorConfig.getTableConfig().getIngestionConfig() != null && 
_processorConfig.getTableConfig()
             .getIngestionConfig().isContinueOnError();
@@ -180,13 +187,9 @@ public class SegmentMapper {
         reuse = recordReader.next(reuse);
         _recordEnricherPipeline.run(reuse);
 
-        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-          //noinspection unchecked
-          for (GenericRow row : (Collection<GenericRow>) 
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-            transformAndWrite(row);
-          }
-        } else {
-          transformAndWrite(reuse);
+        _transformPipeline.processRow(reuse, reusedResult);
+        for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
+          writeRecord(transformedRow);
         }
       } catch (Exception e) {
         if (!continueOnError) {
@@ -210,29 +213,7 @@ public class SegmentMapper {
     return true;
   }
 
-  private void transformAndWrite(GenericRow row)
-      throws IOException {
-    GenericRow decodedRow = row;
-    if (_complexTypeTransformer != null) {
-      decodedRow = _complexTypeTransformer.transform(row);
-    }
-    Collection<GenericRow> rows = (Collection<GenericRow>) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
-    if (rows != null) {
-      for (GenericRow unrappedRow : rows) {
-        GenericRow transformedRow = _recordTransformer.transform(unrappedRow);
-        if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
-          writeRecord(transformedRow);
-        }
-      }
-    } else {
-      GenericRow transformedRow = _recordTransformer.transform(row);
-      if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
-        writeRecord(transformedRow);
-      }
-    }
-  }
-
-  private void writeRecord(GenericRow row)
+  protected void writeRecord(GenericRow row)
       throws IOException {
     String timePartition = _timeHandler.handleTime(row);
     if (timePartition == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to