This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0eb2951c Allowing custom transformers to be passed to 
SegmentProcessingFramework (#11887)
7b0eb2951c is described below

commit 7b0eb2951c5d287b298fbe4fafaa26ad9d3a3f8a
Author: swaminathanmanish <[email protected]>
AuthorDate: Tue Oct 31 09:51:16 2023 -0700

    Allowing custom transformers to be passed to SegmentProcessingFramework 
(#11887)
    
    * Allowing custom transformers to be passed to SegmentProcessingFramework
    
    * Addressing comments
---
 .../framework/SegmentProcessorFramework.java       | 12 +++++++---
 .../segment/processing/mapper/SegmentMapper.java   |  8 +++++--
 .../processing/framework/SegmentMapperTest.java    |  2 +-
 .../framework/SegmentProcessorFrameworkTest.java   |  3 ++-
 .../recordtransformer/CompositeTransformer.java    | 28 ++++++++++++++++++----
 .../spi/data/readers/RecordReaderFileConfig.java   |  1 +
 6 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index cee643c529..fb5a08ff56 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordR
 import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
 import org.apache.pinot.core.segment.processing.reducer.Reducer;
 import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
 import 
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -60,6 +61,7 @@ public class SegmentProcessorFramework {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentProcessorFramework.class);
 
   private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
+  private final List<RecordTransformer> _customRecordTransformers;
   private final SegmentProcessorConfig _segmentProcessorConfig;
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
@@ -76,17 +78,20 @@ public class SegmentProcessorFramework {
   public SegmentProcessorFramework(List<RecordReader> recordReaders, 
SegmentProcessorConfig segmentProcessorConfig,
       File workingDir)
       throws IOException {
-    this(segmentProcessorConfig, workingDir, 
convertRecordReadersToRecordReaderFileConfig(recordReaders), null);
+    this(segmentProcessorConfig, workingDir, 
convertRecordReadersToRecordReaderFileConfig(recordReaders),
+        Collections.emptyList(), null);
   }
 
   public SegmentProcessorFramework(SegmentProcessorConfig 
segmentProcessorConfig, File workingDir,
-      List<RecordReaderFileConfig> recordReaderFileConfigs, 
SegmentNumRowProvider segmentNumRowProvider)
+      List<RecordReaderFileConfig> recordReaderFileConfigs, 
List<RecordTransformer> customRecordTransformers,
+      SegmentNumRowProvider segmentNumRowProvider)
       throws IOException {
 
     Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No 
recordReaderFileConfigs provided");
     LOGGER.info("Initializing SegmentProcessorFramework with {} record 
readers, config: {}, working dir: {}",
         recordReaderFileConfigs.size(), segmentProcessorConfig, 
workingDir.getAbsolutePath());
     _recordReaderFileConfigs = recordReaderFileConfigs;
+    _customRecordTransformers = customRecordTransformers;
 
     _segmentProcessorConfig = segmentProcessorConfig;
 
@@ -139,7 +144,8 @@ public class SegmentProcessorFramework {
       throws Exception {
     // Map phase
     LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_segmentProcessorConfig, _mapperOutputDir);
+    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
+        _segmentProcessorConfig, _mapperOutputDir);
     _partitionToFileManagerMap = mapper.map();
 
     // Check for mapper output files
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 00b61e04e8..b883d775cc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerFactory;
 import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -63,6 +64,7 @@ public class SegmentMapper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
 
   private List<RecordReaderFileConfig> _recordReaderFileConfigs;
+  private List<RecordTransformer> _customRecordTransformers;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
 
@@ -78,8 +80,9 @@ public class SegmentMapper {
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
 
   public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
-      SegmentProcessorConfig processorConfig, File mapperOutputDir) {
+      List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig 
processorConfig, File mapperOutputDir) {
     _recordReaderFileConfigs = recordReaderFileConfigs;
+    _customRecordTransformers = customRecordTransformers;
     _processorConfig = processorConfig;
     _mapperOutputDir = mapperOutputDir;
 
@@ -90,7 +93,7 @@ public class SegmentMapper {
     _fieldSpecs = pair.getLeft();
     _numSortFields = pair.getRight();
     _includeNullFields = 
tableConfig.getIndexingConfig().isNullHandlingEnabled();
-    _recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
+    _recordTransformer = 
CompositeTransformer.composeAllTransformers(_customRecordTransformers, 
tableConfig, schema);
     _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
     List<PartitionerConfig> partitionerConfigs = 
processorConfig.getPartitionerConfigs();
     int numPartitioners = partitionerConfigs.size();
@@ -132,6 +135,7 @@ public class SegmentMapper {
     for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
       RecordReader recordReader = recordReaderFileConfig._recordReader;
       if (recordReader == null) {
+        // We create and use the recordReader here.
         try {
           recordReader =
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 3a3ee33398..db040232f9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -143,7 +143,7 @@ public class SegmentMapperTest {
     segmentRecordReader.init(_indexDir, null, null, true);
     SegmentMapper segmentMapper =
         new SegmentMapper(Collections.singletonList(new 
RecordReaderFileConfig(segmentRecordReader)),
-            processorConfig, mapperOutputDir);
+            Collections.emptyList(), processorConfig, mapperOutputDir);
     Map<String, GenericRowFileManager> partitionToFileManagerMap = 
segmentMapper.map();
     segmentRecordReader.close();
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 39b66fa4dd..43e5e9648a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
@@ -206,7 +207,7 @@ public class SegmentProcessorFrameworkTest {
     SegmentProcessorConfig config =
         new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
     SegmentProcessorFramework framework = new 
SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
-        null);
+        Collections.emptyList(), null);
     List<File> outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
     ImmutableSegment segment = 
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index dbc6cdbe3c..b6fb694306 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.recordtransformer;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -69,12 +70,29 @@ public class CompositeTransformer implements 
RecordTransformer {
    *   </li>
    * </ul>
    */
+  public static List<RecordTransformer> getDefaultTransformers(TableConfig 
tableConfig, Schema schema) {
+    return Stream.of(new ExpressionTransformer(tableConfig, schema), new 
FilterTransformer(tableConfig),
+        new SchemaConformingTransformer(tableConfig, schema), new 
DataTypeTransformer(tableConfig, schema),
+        new TimeValidationTransformer(tableConfig, schema), new 
NullValueTransformer(tableConfig, schema),
+        new SanitizationTransformer(schema)).filter(t -> 
!t.isNoOp()).collect(Collectors.toList());
+  }
+
   public static CompositeTransformer getDefaultTransformer(TableConfig 
tableConfig, Schema schema) {
-    return new CompositeTransformer(
-        Stream.of(new ExpressionTransformer(tableConfig, schema), new 
FilterTransformer(tableConfig),
-                new SchemaConformingTransformer(tableConfig, schema), new 
DataTypeTransformer(tableConfig, schema),
-                new TimeValidationTransformer(tableConfig, schema), new 
NullValueTransformer(tableConfig, schema),
-                new SanitizationTransformer(schema)).filter(t -> 
!t.isNoOp()).collect(Collectors.toList()));
+    return new CompositeTransformer(getDefaultTransformers(tableConfig, 
schema));
+  }
+
+  /**
+   * Includes custom and default transformers.
+   * @param customTransformers
+   * @param tableConfig
+   * @param schema
+   * @return
+   */
+  public static CompositeTransformer 
composeAllTransformers(List<RecordTransformer> customTransformers,
+      TableConfig tableConfig, Schema schema) {
+    List<RecordTransformer> allTransformers = new 
ArrayList<>(customTransformers);
+    allTransformers.addAll(getDefaultTransformers(tableConfig, schema));
+    return new CompositeTransformer(allTransformers);
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 628de1052a..8a2ebb9e16 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -34,6 +34,7 @@ public class RecordReaderFileConfig {
   public final File _dataFile;
   public final Set<String> _fieldsToRead;
   public final RecordReaderConfig _recordReaderConfig;
+  // Record Readers created/passed from clients.
   public final RecordReader _recordReader;
 
   // Pass in the info needed to initialize the reader


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to