This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b0eb2951c Allowing custom transformers to be passed to
SegmentProcessingFramework (#11887)
7b0eb2951c is described below
commit 7b0eb2951c5d287b298fbe4fafaa26ad9d3a3f8a
Author: swaminathanmanish <[email protected]>
AuthorDate: Tue Oct 31 09:51:16 2023 -0700
Allowing custom transformers to be passed to SegmentProcessingFramework
(#11887)
* Allowing custom transformers to be passed to SegmentProcessingFramework
* Addressing comments
---
.../framework/SegmentProcessorFramework.java | 12 +++++++---
.../segment/processing/mapper/SegmentMapper.java | 8 +++++--
.../processing/framework/SegmentMapperTest.java | 2 +-
.../framework/SegmentProcessorFrameworkTest.java | 3 ++-
.../recordtransformer/CompositeTransformer.java | 28 ++++++++++++++++++----
.../spi/data/readers/RecordReaderFileConfig.java | 1 +
6 files changed, 42 insertions(+), 12 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index cee643c529..fb5a08ff56 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordR
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -60,6 +61,7 @@ public class SegmentProcessorFramework {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentProcessorFramework.class);
private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
+ private final List<RecordTransformer> _customRecordTransformers;
private final SegmentProcessorConfig _segmentProcessorConfig;
private final File _mapperOutputDir;
private final File _reducerOutputDir;
@@ -76,17 +78,20 @@ public class SegmentProcessorFramework {
public SegmentProcessorFramework(List<RecordReader> recordReaders,
SegmentProcessorConfig segmentProcessorConfig,
File workingDir)
throws IOException {
- this(segmentProcessorConfig, workingDir,
convertRecordReadersToRecordReaderFileConfig(recordReaders), null);
+ this(segmentProcessorConfig, workingDir,
convertRecordReadersToRecordReaderFileConfig(recordReaders),
+ Collections.emptyList(), null);
}
public SegmentProcessorFramework(SegmentProcessorConfig
segmentProcessorConfig, File workingDir,
- List<RecordReaderFileConfig> recordReaderFileConfigs,
SegmentNumRowProvider segmentNumRowProvider)
+ List<RecordReaderFileConfig> recordReaderFileConfigs,
List<RecordTransformer> customRecordTransformers,
+ SegmentNumRowProvider segmentNumRowProvider)
throws IOException {
Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No
recordReaderFileConfigs provided");
LOGGER.info("Initializing SegmentProcessorFramework with {} record
readers, config: {}, working dir: {}",
recordReaderFileConfigs.size(), segmentProcessorConfig,
workingDir.getAbsolutePath());
_recordReaderFileConfigs = recordReaderFileConfigs;
+ _customRecordTransformers = customRecordTransformers;
_segmentProcessorConfig = segmentProcessorConfig;
@@ -139,7 +144,8 @@ public class SegmentProcessorFramework {
throws Exception {
// Map phase
LOGGER.info("Beginning map phase on {} record readers",
_recordReaderFileConfigs.size());
- SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs,
_segmentProcessorConfig, _mapperOutputDir);
+ SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs,
_customRecordTransformers,
+ _segmentProcessorConfig, _mapperOutputDir);
_partitionToFileManagerMap = mapper.map();
// Check for mapper output files
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 00b61e04e8..b883d775cc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerFactory;
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -63,6 +64,7 @@ public class SegmentMapper {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentMapper.class);
private List<RecordReaderFileConfig> _recordReaderFileConfigs;
+ private List<RecordTransformer> _customRecordTransformers;
private final SegmentProcessorConfig _processorConfig;
private final File _mapperOutputDir;
@@ -78,8 +80,9 @@ public class SegmentMapper {
private final Map<String, GenericRowFileManager> _partitionToFileManagerMap
= new TreeMap<>();
public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
- SegmentProcessorConfig processorConfig, File mapperOutputDir) {
+ List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig
processorConfig, File mapperOutputDir) {
_recordReaderFileConfigs = recordReaderFileConfigs;
+ _customRecordTransformers = customRecordTransformers;
_processorConfig = processorConfig;
_mapperOutputDir = mapperOutputDir;
@@ -90,7 +93,7 @@ public class SegmentMapper {
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields =
tableConfig.getIndexingConfig().isNullHandlingEnabled();
- _recordTransformer =
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
+ _recordTransformer =
CompositeTransformer.composeAllTransformers(_customRecordTransformers,
tableConfig, schema);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs =
processorConfig.getPartitionerConfigs();
int numPartitioners = partitionerConfigs.size();
@@ -132,6 +135,7 @@ public class SegmentMapper {
for (RecordReaderFileConfig recordReaderFileConfig :
_recordReaderFileConfigs) {
RecordReader recordReader = recordReaderFileConfig._recordReader;
if (recordReader == null) {
+ // We create and use the recordReader here.
try {
recordReader =
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 3a3ee33398..db040232f9 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -143,7 +143,7 @@ public class SegmentMapperTest {
segmentRecordReader.init(_indexDir, null, null, true);
SegmentMapper segmentMapper =
new SegmentMapper(Collections.singletonList(new
RecordReaderFileConfig(segmentRecordReader)),
- processorConfig, mapperOutputDir);
+ Collections.emptyList(), processorConfig, mapperOutputDir);
Map<String, GenericRowFileManager> partitionToFileManagerMap =
segmentMapper.map();
segmentRecordReader.close();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 39b66fa4dd..43e5e9648a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
@@ -206,7 +207,7 @@ public class SegmentProcessorFrameworkTest {
SegmentProcessorConfig config =
new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
SegmentProcessorFramework framework = new
SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
- null);
+ Collections.emptyList(), null);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment =
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index dbc6cdbe3c..b6fb694306 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.recordtransformer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -69,12 +70,29 @@ public class CompositeTransformer implements
RecordTransformer {
* </li>
* </ul>
*/
+ public static List<RecordTransformer> getDefaultTransformers(TableConfig
tableConfig, Schema schema) {
+ return Stream.of(new ExpressionTransformer(tableConfig, schema), new
FilterTransformer(tableConfig),
+ new SchemaConformingTransformer(tableConfig, schema), new
DataTypeTransformer(tableConfig, schema),
+ new TimeValidationTransformer(tableConfig, schema), new
NullValueTransformer(tableConfig, schema),
+ new SanitizationTransformer(schema)).filter(t ->
!t.isNoOp()).collect(Collectors.toList());
+ }
+
public static CompositeTransformer getDefaultTransformer(TableConfig
tableConfig, Schema schema) {
- return new CompositeTransformer(
- Stream.of(new ExpressionTransformer(tableConfig, schema), new
FilterTransformer(tableConfig),
- new SchemaConformingTransformer(tableConfig, schema), new
DataTypeTransformer(tableConfig, schema),
- new TimeValidationTransformer(tableConfig, schema), new
NullValueTransformer(tableConfig, schema),
- new SanitizationTransformer(schema)).filter(t ->
!t.isNoOp()).collect(Collectors.toList()));
+ return new CompositeTransformer(getDefaultTransformers(tableConfig,
schema));
+ }
+
+ /**
+ * Includes custom and default transformers.
+ * @param customTransformers
+ * @param tableConfig
+ * @param schema
+ * @return
+ */
+ public static CompositeTransformer
composeAllTransformers(List<RecordTransformer> customTransformers,
+ TableConfig tableConfig, Schema schema) {
+ List<RecordTransformer> allTransformers = new
ArrayList<>(customTransformers);
+ allTransformers.addAll(getDefaultTransformers(tableConfig, schema));
+ return new CompositeTransformer(allTransformers);
}
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 628de1052a..8a2ebb9e16 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -34,6 +34,7 @@ public class RecordReaderFileConfig {
public final File _dataFile;
public final Set<String> _fieldsToRead;
public final RecordReaderConfig _recordReaderConfig;
+ // Record Readers created/passed from clients.
public final RecordReader _recordReader;
// Pass in the info needed to initialize the reader
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]