This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9464359324 avoid redundant record transforms (#8935)
9464359324 is described below
commit 94643593247729ff5319feefbad17f17b9531d47
Author: Xiaobing <[email protected]>
AuthorDate: Mon Jun 20 17:07:21 2022 -0700
avoid redundant record transforms (#8935)
---
.../framework/SegmentProcessorFramework.java | 5 ++---
.../realtime/converter/RealtimeSegmentConverter.java | 4 ++--
.../creator/RecordReaderSegmentCreationDataSource.java | 9 +++++++--
.../local/segment/creator/TransformPipeline.java | 7 +++++++
.../creator/impl/SegmentIndexCreationDriverImpl.java | 18 +++++++++++++-----
.../tools/admin/command/DataImportDryRunCommand.java | 5 -----
6 files changed, 31 insertions(+), 17 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 6815c58d97..e86b456915 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -32,8 +32,8 @@ import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordR
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
@@ -132,7 +132,6 @@ public class SegmentProcessorFramework {
}
int maxNumRecordsPerSegment =
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
- CompositeTransformer passThroughTransformer =
CompositeTransformer.getPassThroughTransformer();
int sequenceId = 0;
for (Map.Entry<String, GenericRowFileManager> entry :
partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
@@ -151,7 +150,7 @@ public class SegmentProcessorFramework {
GenericRowFileRecordReader recordReaderForRange =
recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new
RecordReaderSegmentCreationDataSource(recordReaderForRange),
- passThroughTransformer, null);
+ TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 8544ea5d19..2ca1758651 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import
org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -119,7 +119,7 @@ public class RealtimeSegmentConverter {
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader);
- driver.init(genConfig, dataSource,
CompositeTransformer.getPassThroughTransformer(), null);
+ driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline());
driver.build();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
index 2db825b566..411c040cdd 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -38,16 +38,21 @@ public class RecordReaderSegmentCreationDataSource
implements SegmentCreationDat
private static final Logger LOGGER =
LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class);
private final RecordReader _recordReader;
+ private TransformPipeline _transformPipeline;
public RecordReaderSegmentCreationDataSource(RecordReader recordReader) {
_recordReader = recordReader;
}
+ public void setTransformPipeline(TransformPipeline transformPipeline) {
+ _transformPipeline = transformPipeline;
+ }
+
@Override
public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig
statsCollectorConfig) {
try {
- TransformPipeline transformPipeline =
- new TransformPipeline(statsCollectorConfig.getTableConfig(),
statsCollectorConfig.getSchema());
+ TransformPipeline transformPipeline = _transformPipeline != null ?
_transformPipeline
+ : new TransformPipeline(statsCollectorConfig.getTableConfig(),
statsCollectorConfig.getSchema());
SegmentPreIndexStatsCollector collector = new
SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
collector.init();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 94021193e7..9da9dab2b9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -63,6 +63,13 @@ public class TransformPipeline {
_complexTypeTransformer =
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
}
+ /**
+ * Returns a pass through pipeline that does not transform the record.
+ */
+ public static TransformPipeline getPassThroughPipeline() {
+ return new
TransformPipeline(CompositeTransformer.getPassThroughTransformer(), null);
+ }
+
/**
* Process and validate the decoded row against schema.
* @param decodedRow the row data to pass in
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 18a50dbf0b..98ef1e7e0b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import
org.apache.pinot.segment.local.segment.creator.IntermediateSegmentSegmentCreationDataSource;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
@@ -77,7 +76,6 @@ import org.slf4j.LoggerFactory;
* Implementation of an index segment creator.
*/
// TODO: Check resource leaks
-@SuppressWarnings("serial")
public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDriver {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentIndexCreationDriverImpl.class);
@@ -151,20 +149,30 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
LOGGER.info("RecordReaderSegmentCreationDataSource is used");
dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
}
- init(config, dataSource,
CompositeTransformer.getDefaultTransformer(config.getTableConfig(),
config.getSchema()),
-
ComplexTypeTransformer.getComplexTypeTransformer(config.getTableConfig()));
+ init(config, dataSource, new TransformPipeline(config.getTableConfig(),
config.getSchema()));
}
+ @Deprecated
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource
dataSource,
RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer
complexTypeTransformer)
throws Exception {
+ init(config, dataSource, new TransformPipeline(recordTransformer,
complexTypeTransformer));
+ }
+
+ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource
dataSource,
+ TransformPipeline transformPipeline)
+ throws Exception {
_config = config;
_recordReader = dataSource.getRecordReader();
_dataSchema = config.getSchema();
if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data
source");
}
- _transformPipeline = new TransformPipeline(recordTransformer,
complexTypeTransformer);
+ _transformPipeline = transformPipeline;
+ // Use the same transform pipeline if the data source is backed by a
record reader
+ if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
+ ((RecordReaderSegmentCreationDataSource)
dataSource).setTransformPipeline(transformPipeline);
+ }
// Initialize stats collection
_segmentStats = dataSource.gatherStats(
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
index 984b324b08..0047806e00 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
@@ -21,7 +21,6 @@ package org.apache.pinot.tools.admin.command;
import java.io.File;
import java.util.TreeMap;
import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
-import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -57,16 +56,12 @@ public class DataImportDryRunCommand extends
AbstractBaseAdminCommand implements
JSONRecordReader jsonRecordReader = new JSONRecordReader();
jsonRecordReader.init(new File(_jsonFile), null, null);
- RecordReaderSegmentCreationDataSource dataSource =
- new RecordReaderSegmentCreationDataSource(jsonRecordReader);
-
TableConfig tableConfig = JsonUtils.fileToObject(new
File(_tableConfigFile), TableConfig.class);
StatsCollectorConfig statsCollectorConfig = new
StatsCollectorConfig(tableConfig, new Schema(), null);
TransformPipeline transformPipeline =
new TransformPipeline(statsCollectorConfig.getTableConfig(),
statsCollectorConfig.getSchema());
-
// Gather the stats
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]