This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a6ac89203b Use TransformPipeline in SegmentMapper (#14425)
a6ac89203b is described below
commit a6ac89203b70f3ac8d4967689cb785239880ca6a
Author: Shounak kulkarni <[email protected]>
AuthorDate: Tue Nov 19 15:17:12 2024 +0530
Use TransformPipeline in SegmentMapper (#14425)
* add a constructor to pass custom transform pipelines
* Use TransformPipeline for all transformations
* validations already done in transformPipeline
* Allow passing custom TransformPipeline to SegmentProcessorFramework
---
.../framework/SegmentProcessorFramework.java | 31 ++++++++++-
.../segment/processing/mapper/SegmentMapper.java | 65 ++++++++--------------
2 files changed, 51 insertions(+), 45 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 4b166b934a..4c9081a1ee 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -63,6 +63,7 @@ public class SegmentProcessorFramework {
private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
private final List<RecordTransformer> _customRecordTransformers;
+ private final TransformPipeline _transformPipeline;
private final SegmentProcessorConfig _segmentProcessorConfig;
private final File _mapperOutputDir;
private final File _reducerOutputDir;
@@ -87,12 +88,28 @@ public class SegmentProcessorFramework {
List<RecordReaderFileConfig> recordReaderFileConfigs,
List<RecordTransformer> customRecordTransformers,
SegmentNumRowProvider segmentNumRowProvider)
throws IOException {
+ this(segmentProcessorConfig, workingDir, recordReaderFileConfigs,
customRecordTransformers, null,
+ segmentNumRowProvider);
+ }
+
+ public SegmentProcessorFramework(SegmentProcessorConfig
segmentProcessorConfig, File workingDir,
+ List<RecordReaderFileConfig> recordReaderFileConfigs, TransformPipeline
transformPipeline,
+ SegmentNumRowProvider segmentNumRowProvider)
+ throws IOException {
+ this(segmentProcessorConfig, workingDir, recordReaderFileConfigs, null,
transformPipeline, segmentNumRowProvider);
+ }
+
+ protected SegmentProcessorFramework(SegmentProcessorConfig
segmentProcessorConfig, File workingDir,
+ List<RecordReaderFileConfig> recordReaderFileConfigs,
List<RecordTransformer> customRecordTransformers,
+ TransformPipeline transformPipeline, SegmentNumRowProvider
segmentNumRowProvider)
+ throws IOException {
Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No
recordReaderFileConfigs provided");
LOGGER.info("Initializing SegmentProcessorFramework with {} record
readers, config: {}, working dir: {}",
recordReaderFileConfigs.size(), segmentProcessorConfig,
workingDir.getAbsolutePath());
_recordReaderFileConfigs = recordReaderFileConfigs;
_customRecordTransformers = customRecordTransformers;
+ _transformPipeline = transformPipeline;
_segmentProcessorConfig = segmentProcessorConfig;
@@ -147,9 +164,8 @@ public class SegmentProcessorFramework {
while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
// Initialise the mapper. Eliminate the record readers that have been
processed in the previous iterations.
- SegmentMapper mapper =
- new
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
numRecordReaders),
- _customRecordTransformers, _segmentProcessorConfig,
_mapperOutputDir);
+ SegmentMapper mapper = getSegmentMapper(
+ _recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
numRecordReaders));
// Log start of iteration details only if intermediate file size
threshold is set.
if (isMapperOutputSizeThresholdEnabled) {
@@ -217,6 +233,15 @@ public class SegmentProcessorFramework {
return outputSegmentDirs;
}
+ protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig>
recordReaderFileConfigs) {
+ if (_transformPipeline != null) {
+ return new SegmentMapper(recordReaderFileConfigs, _transformPipeline,
_segmentProcessorConfig, _mapperOutputDir);
+ } else {
+ return new SegmentMapper(recordReaderFileConfigs,
_customRecordTransformers, _segmentProcessorConfig,
+ _mapperOutputDir);
+ }
+ }
+
private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++)
{
RecordReaderFileConfig recordReaderFileConfig =
_recordReaderFileConfigs.get(i);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 84377b9021..49dfc42431 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.segment.processing.mapper;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -42,7 +41,7 @@ import
org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
-import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -71,21 +70,28 @@ public class SegmentMapper {
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
- private final CompositeTransformer _recordTransformer;
- private final ComplexTypeTransformer _complexTypeTransformer;
+ private final TransformPipeline _transformPipeline;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
private final String[] _partitionsBuffer;
// NOTE: Use TreeMap so that the order is deterministic
private final Map<String, GenericRowFileManager> _partitionToFileManagerMap
= new TreeMap<>();
- private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
- private List<RecordReaderFileConfig> _recordReaderFileConfigs;
- private List<RecordTransformer> _customRecordTransformers;
+ private final AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
+ private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig
processorConfig, File mapperOutputDir) {
+ this(recordReaderFileConfigs,
+ new TransformPipeline(
+
CompositeTransformer.composeAllTransformers(customRecordTransformers,
processorConfig.getTableConfig(),
+ processorConfig.getSchema()),
+
ComplexTypeTransformer.getComplexTypeTransformer(processorConfig.getTableConfig())),
+ processorConfig, mapperOutputDir);
+ }
+
+ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
TransformPipeline transformPipeline,
+ SegmentProcessorConfig processorConfig, File mapperOutputDir) {
_recordReaderFileConfigs = recordReaderFileConfigs;
- _customRecordTransformers = customRecordTransformers;
_processorConfig = processorConfig;
_mapperOutputDir = mapperOutputDir;
@@ -97,9 +103,8 @@ public class SegmentMapper {
_numSortFields = pair.getRight();
_includeNullFields =
schema.isEnableColumnBasedNullHandling() ||
tableConfig.getIndexingConfig().isNullHandlingEnabled();
- _recordEnricherPipeline =
RecordEnricherPipeline.fromTableConfig(tableConfig);
- _recordTransformer =
CompositeTransformer.composeAllTransformers(_customRecordTransformers,
tableConfig, schema);
- _complexTypeTransformer =
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
+ _recordEnricherPipeline =
RecordEnricherPipeline.fromTableConfig(processorConfig.getTableConfig());
+ _transformPipeline = transformPipeline;
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs =
processorConfig.getPartitionerConfigs();
int numPartitioners = partitionerConfigs.size();
@@ -168,9 +173,11 @@ public class SegmentMapper {
// Returns true if the map phase can continue, false if it should terminate
based on the configured threshold for
// intermediate file size during map phase.
- private boolean completeMapAndTransformRow(RecordReader recordReader,
GenericRow reuse,
+ protected boolean completeMapAndTransformRow(RecordReader recordReader,
GenericRow reuse,
Consumer<Object> observer, int count, int totalCount) throws Exception {
observer.accept(String.format("Doing map phase on data from RecordReader
(%d out of %d)", count, totalCount));
+
+ TransformPipeline.Result reusedResult = new TransformPipeline.Result();
boolean continueOnError =
_processorConfig.getTableConfig().getIngestionConfig() != null &&
_processorConfig.getTableConfig()
.getIngestionConfig().isContinueOnError();
@@ -180,13 +187,9 @@ public class SegmentMapper {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);
- if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- //noinspection unchecked
- for (GenericRow row : (Collection<GenericRow>)
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
- transformAndWrite(row);
- }
- } else {
- transformAndWrite(reuse);
+ _transformPipeline.processRow(reuse, reusedResult);
+ for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
+ writeRecord(transformedRow);
}
} catch (Exception e) {
if (!continueOnError) {
@@ -210,29 +213,7 @@ public class SegmentMapper {
return true;
}
- private void transformAndWrite(GenericRow row)
- throws IOException {
- GenericRow decodedRow = row;
- if (_complexTypeTransformer != null) {
- decodedRow = _complexTypeTransformer.transform(row);
- }
- Collection<GenericRow> rows = (Collection<GenericRow>)
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
- if (rows != null) {
- for (GenericRow unrappedRow : rows) {
- GenericRow transformedRow = _recordTransformer.transform(unrappedRow);
- if (transformedRow != null &&
IngestionUtils.shouldIngestRow(transformedRow)) {
- writeRecord(transformedRow);
- }
- }
- } else {
- GenericRow transformedRow = _recordTransformer.transform(row);
- if (transformedRow != null &&
IngestionUtils.shouldIngestRow(transformedRow)) {
- writeRecord(transformedRow);
- }
- }
- }
-
- private void writeRecord(GenericRow row)
+ protected void writeRecord(GenericRow row)
throws IOException {
String timePartition = _timeHandler.handleTime(row);
if (timePartition == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]