This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a1c9b63 Produce GenericRow file in segment processing mapper (#7013)
a1c9b63 is described below
commit a1c9b631381a25ddd6d3164d6a9ce337c3939b9f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jun 4 14:45:44 2021 -0700
Produce GenericRow file in segment processing mapper (#7013)
Produce GenericRow file instead of Avro file in segment processing mapper:
- To preserve the null value info from the source file
- Apply default record transformation to solve the NPE described in #6902
- GenericRow file can be random accessed, which can be used directly as the
record reader
Add `GenericRowFileManager` to wrap the file reader/writer and related
arguments to simplify the handling.
Also modify the `SegmentMapperTest` to ensure the null value info can be
preserved
---
.../processing/collector/ConcatCollector.java | 66 ++---
.../processing/framework/SegmentMapper.java | 179 ++++++++------
.../processing/framework/SegmentMapperConfig.java | 18 +-
.../framework/SegmentProcessorFramework.java | 75 +++---
.../processing/framework/SegmentReducer.java | 49 ++--
.../genericrow/GenericRowFileManager.java | 109 +++++++++
.../genericrow/GenericRowFileReader.java | 3 +-
.../processing/utils/SegmentProcessingUtils.java | 84 +++++++
.../{collector => utils}/SortOrderComparator.java | 2 +-
.../processing/framework/SegmentMapperTest.java | 265 ++++++++++++---------
.../processing/framework/SegmentReducerTest.java | 79 +++---
11 files changed, 569 insertions(+), 360 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
index d7cc173..263a0ec 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
@@ -22,16 +22,17 @@ import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.Arrays;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
+import org.apache.pinot.core.segment.processing.utils.SortOrderComparator;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -40,15 +41,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
* A Collector implementation for collecting and concatenating all incoming
rows.
*/
public class ConcatCollector implements Collector {
- private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
- private static final String RECORD_DATA_FILE_NAME = "record.data";
-
- private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
private final int _numSortColumns;
private final SortOrderComparator _sortOrderComparator;
private final File _workingDir;
- private final File _recordOffsetFile;
- private final File _recordDataFile;
+ private final GenericRowFileManager _recordFileManager;
private GenericRowFileWriter _recordFileWriter;
private GenericRowFileReader _recordFileReader;
@@ -56,44 +52,28 @@ public class ConcatCollector implements Collector {
public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
List<String> sortOrder = collectorConfig.getSortOrder();
+ List<FieldSpec> fieldSpecs;
if (CollectionUtils.isNotEmpty(sortOrder)) {
+ fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
_numSortColumns = sortOrder.size();
- DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
- for (int i = 0; i < _numSortColumns; i++) {
- String sortColumn = sortOrder.get(i);
- FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
- Preconditions.checkArgument(fieldSpec != null, "Failed to find sort
column: %s", sortColumn);
- Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot
sort on MV column: %s", sortColumn);
- sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
- _fieldSpecs.add(fieldSpec);
- }
- _sortOrderComparator = new SortOrderComparator(_numSortColumns,
sortColumnStoredTypes);
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn() &&
!sortOrder.contains(fieldSpec.getName())) {
- _fieldSpecs.add(fieldSpec);
- }
- }
+ _sortOrderComparator =
SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
} else {
+ fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
_numSortColumns = 0;
_sortOrderComparator = null;
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- _fieldSpecs.add(fieldSpec);
- }
- }
}
_workingDir =
new File(FileUtils.getTempDirectory(),
String.format("concat_collector_%d", System.currentTimeMillis()));
Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s
for %s with config: %s",
_workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(),
collectorConfig);
- _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
- _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
+ // TODO: Pass 'includeNullFields' from the config
+ _recordFileManager = new GenericRowFileManager(_workingDir, fieldSpecs,
true);
try {
- reset();
+ _recordFileWriter = _recordFileManager.getFileWriter();
} catch (IOException e) {
- throw new RuntimeException("Caught exception while resetting the
collector", e);
+ throw new RuntimeException("Caught exception while creating the file
writer", e);
}
}
@@ -107,8 +87,8 @@ public class ConcatCollector implements Collector {
@Override
public Iterator<GenericRow> iterator()
throws IOException {
- _recordFileWriter.close();
- _recordFileReader = new GenericRowFileReader(_recordOffsetFile,
_recordDataFile, _fieldSpecs, true);
+ _recordFileManager.closeFileWriter();
+ _recordFileReader = _recordFileManager.getFileReader();
// TODO: A lot of this code can be made common across Collectors, once
{@link RollupCollector} is also converted to off heap implementation
if (_numSortColumns != 0) {
@@ -156,15 +136,8 @@ public class ConcatCollector implements Collector {
@Override
public void reset()
throws IOException {
- if (_recordFileWriter != null) {
- _recordFileWriter.close();
- }
- if (_recordFileReader != null) {
- _recordFileReader.close();
- }
- FileUtils.cleanDirectory(_workingDir);
- // TODO: Pass 'includeNullFields' from the config
- _recordFileWriter = new GenericRowFileWriter(_recordOffsetFile,
_recordDataFile, _fieldSpecs, true);
+ _recordFileManager.cleanUp();
+ _recordFileWriter = _recordFileManager.getFileWriter();
_numDocs = 0;
}
@@ -172,12 +145,7 @@ public class ConcatCollector implements Collector {
public void close()
throws IOException {
try {
- if (_recordFileWriter != null) {
- _recordFileWriter.close();
- }
- if (_recordFileReader != null) {
- _recordFileReader.close();
- }
+ _recordFileManager.cleanUp();
} finally {
FileUtils.deleteQuietly(_workingDir);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
index 7abe6ad..c16479c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
@@ -20,133 +20,156 @@ package
org.apache.pinot.core.segment.processing.framework;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.segment.processing.filter.RecordFilter;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
import
org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
-import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
+import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.partition.Partitioner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mapper phase of the SegmentProcessorFramework.
- * Reads the input segment and creates partitioned avro data files
+ * Reads the input records and creates partitioned generic row files.
* Performs:
* - record filtering
* - column transformations
* - partitioning
*/
public class SegmentMapper {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentMapper.class);
- private final File _inputSegment;
+
+ private final List<RecordReader> _recordReaders;
private final File _mapperOutputDir;
- private final String _mapperId;
- private final Schema _avroSchema;
- private final RecordTransformer _recordTransformer;
+ private final List<FieldSpec> _fieldSpecs;
+ private final boolean _includeNullFields;
+
+ // TODO: Merge the following transformers into one. Currently we need an
extra DataTypeTransformer in the end in case
+ // _recordTransformer changes the data type.
+ private final CompositeTransformer _defaultRecordTransformer;
private final RecordFilter _recordFilter;
- private final int _numPartitioners;
- private final List<Partitioner> _partitioners = new ArrayList<>();
- private final Map<String, DataFileWriter<GenericData.Record>>
_partitionToDataFileWriterMap = new HashMap<>();
+ private final RecordTransformer _recordTransformer;
+ private final DataTypeTransformer _dataTypeTransformer;
+
+ private final Partitioner[] _partitioners;
+ private final String[] _partitionsBuffer;
+ private final Map<String, GenericRowFileManager> _partitionToFileManagerMap
= new HashMap<>();
- public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig
mapperConfig, File mapperOutputDir) {
- _inputSegment = inputSegment;
+ public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig
mapperConfig, File mapperOutputDir) {
+ _recordReaders = recordReaders;
_mapperOutputDir = mapperOutputDir;
- _mapperId = mapperId;
- _avroSchema =
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+ TableConfig tableConfig = mapperConfig.getTableConfig();
+ Schema schema = mapperConfig.getSchema();
+ List<String> sortOrder = tableConfig.getIndexingConfig().getSortedColumn();
+ if (CollectionUtils.isNotEmpty(sortOrder)) {
+ _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
+ } else {
+ _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
+ }
+ _includeNullFields =
tableConfig.getIndexingConfig().isNullHandlingEnabled();
+ _defaultRecordTransformer =
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
_recordFilter =
RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
_recordTransformer =
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
- for (PartitionerConfig partitionerConfig :
mapperConfig.getPartitionerConfigs()) {
- _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
+ _dataTypeTransformer = new DataTypeTransformer(schema);
+ List<PartitionerConfig> partitionerConfigs =
mapperConfig.getPartitionerConfigs();
+ int numPartitioners = partitionerConfigs.size();
+ _partitioners = new Partitioner[numPartitioners];
+ _partitionsBuffer = new String[numPartitioners];
+ for (int i = 0; i < numPartitioners; i++) {
+ _partitioners[i] =
PartitionerFactory.getPartitioner(partitionerConfigs.get(i));
}
- _numPartitioners = _partitioners.size();
LOGGER.info(
- "Initialized mapper with id: {}, input segment: {}, output dir: {},
recordTransformer: {}, recordFilter: {}, partitioners: {}",
- _mapperId, _inputSegment, _mapperOutputDir,
_recordTransformer.getClass(), _recordFilter.getClass(),
- _partitioners.stream().map(p ->
p.getClass().toString()).collect(Collectors.joining(",")));
+ "Initialized mapper with {} record readers, output dir: {},
recordTransformer: {}, recordFilter: {}, partitioners: {}",
+ _recordReaders.size(), _mapperOutputDir,
_recordTransformer.getClass(), _recordFilter.getClass(),
+ Arrays.stream(_partitioners).map(p ->
p.getClass().toString()).collect(Collectors.joining(",")));
}
/**
- * Reads the input segment and generates partitioned avro data files into
the mapper output directory
- * Records for each partition are put into a directory of its own withing
the mapper output directory, identified by the partition name
+ * Reads the input records and generates partitioned generic row files into
the mapper output directory.
+ * Records for each partition are put into a directory of the partition name
within the mapper output directory.
*/
- public void map()
+ public Map<String, GenericRowFileManager> map()
throws Exception {
-
- PinotSegmentRecordReader segmentRecordReader = new
PinotSegmentRecordReader(_inputSegment);
- GenericRow reusableRow = new GenericRow();
- GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
- String[] partitions = new String[_numPartitioners];
-
- while (segmentRecordReader.hasNext()) {
- reusableRow = segmentRecordReader.next(reusableRow);
-
- // Record filtering
- if (_recordFilter.filter(reusableRow)) {
- continue;
- }
-
- // Record transformation
- reusableRow = _recordTransformer.transformRecord(reusableRow);
-
- // Partitioning
- int p = 0;
- for (Partitioner partitioner : _partitioners) {
- partitions[p++] = partitioner.getPartition(reusableRow);
- }
- String partition = StringUtil.join("_", partitions);
-
- // Create writer for the partition, if not exists
- DataFileWriter<GenericData.Record> recordWriter =
_partitionToDataFileWriterMap.get(partition);
- if (recordWriter == null) {
- File partDir = new File(_mapperOutputDir, partition);
- if (!partDir.exists()) {
- Files.createDirectory(Paths.get(partDir.getAbsolutePath()));
+ GenericRow reuse = new GenericRow();
+ for (RecordReader recordReader : _recordReaders) {
+ while (recordReader.hasNext()) {
+ reuse = recordReader.next(reuse);
+
+ // TODO: Add ComplexTypeTransformer here. Currently it is not
idempotent so cannot add it
+
+ if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+ //noinspection unchecked
+ for (GenericRow row : (Collection<GenericRow>)
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+ GenericRow transformedRow =
_defaultRecordTransformer.transform(row);
+ if (transformedRow != null &&
IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+ .filter(transformedRow)) {
+ writeRecord(transformedRow);
+ }
+ }
+ } else {
+ GenericRow transformedRow =
_defaultRecordTransformer.transform(reuse);
+ if (transformedRow != null &&
IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+ .filter(transformedRow)) {
+ writeRecord(transformedRow);
+ }
}
- recordWriter = new DataFileWriter<>(new
GenericDatumWriter<>(_avroSchema));
- recordWriter.create(_avroSchema, new File(partDir,
createMapperOutputFileName(_mapperId)));
- _partitionToDataFileWriterMap.put(partition, recordWriter);
- }
- // Write record to avro file for its partition
- SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(reusableRow,
reusableRecord);
- recordWriter.append(reusableRecord);
+ reuse.clear();
+ }
+ }
- reusableRow.clear();
+ for (GenericRowFileManager fileManager :
_partitionToFileManagerMap.values()) {
+ fileManager.closeFileWriter();
}
+
+ return _partitionToFileManagerMap;
}
- /**
- * Cleanup the mapper state
- */
- public void cleanup()
+ private void writeRecord(GenericRow row)
throws IOException {
- for (DataFileWriter<GenericData.Record> recordDataFileWriter :
_partitionToDataFileWriterMap.values()) {
- recordDataFileWriter.close();
+ // Record transformation
+ row =
_dataTypeTransformer.transform(_recordTransformer.transformRecord(row));
+
+ // Partitioning
+ int numPartitioners = _partitioners.length;
+ for (int i = 0; i < numPartitioners; i++) {
+ _partitionsBuffer[i] = _partitioners[i].getPartition(row);
+ }
+ String partition = StringUtil.join("_", _partitionsBuffer);
+
+ // Create writer for the partition if not exists
+ GenericRowFileManager fileManager =
_partitionToFileManagerMap.get(partition);
+ if (fileManager == null) {
+ File partitionOutputDir = new File(_mapperOutputDir, partition);
+ FileUtils.forceMkdir(partitionOutputDir);
+ fileManager = new GenericRowFileManager(partitionOutputDir, _fieldSpecs,
_includeNullFields);
+ _partitionToFileManagerMap.put(partition, fileManager);
}
- }
- public static String createMapperOutputFileName(String mapperId) {
- return "mapper_" + mapperId + ".avro";
+ fileManager.getFileWriter().write(row);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
index e6360a6..96cf2e5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -29,25 +30,30 @@ import org.apache.pinot.spi.data.Schema;
* Config for the mapper phase of SegmentProcessorFramework
*/
public class SegmentMapperConfig {
-
- private final Schema _pinotSchema;
+ private final TableConfig _tableConfig;
+ private final Schema _schema;
private final RecordTransformerConfig _recordTransformerConfig;
private final RecordFilterConfig _recordFilterConfig;
private final List<PartitionerConfig> _partitionerConfigs;
- public SegmentMapperConfig(Schema pinotSchema, RecordTransformerConfig
recordTransformerConfig,
+ public SegmentMapperConfig(TableConfig tableConfig, Schema schema,
RecordTransformerConfig recordTransformerConfig,
RecordFilterConfig recordFilterConfig, List<PartitionerConfig>
partitionerConfigs) {
- _pinotSchema = pinotSchema;
+ _tableConfig = tableConfig;
+ _schema = schema;
_recordTransformerConfig = recordTransformerConfig;
_recordFilterConfig = recordFilterConfig;
_partitionerConfigs = partitionerConfigs;
}
+ public TableConfig getTableConfig() {
+ return _tableConfig;
+ }
+
/**
* The Pinot schema
*/
- public Schema getPinotSchema() {
- return _pinotSchema;
+ public Schema getSchema() {
+ return _schema;
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 903f065..34c280a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -20,14 +20,20 @@ package org.apache.pinot.core.segment.processing.framework;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,15 +49,14 @@ import org.slf4j.LoggerFactory;
* (eg task which merges segments, tasks which aligns segments per time
boundaries etc)
*/
public class SegmentProcessorFramework {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentProcessorFramework.class);
private final File _inputSegmentsDir;
private final File _outputSegmentsDir;
private final SegmentProcessorConfig _segmentProcessorConfig;
- private final Schema _pinotSchema;
private final TableConfig _tableConfig;
+ private final Schema _schema;
private final File _baseDir;
private final File _mapperInputDir;
@@ -80,8 +85,8 @@ public class SegmentProcessorFramework {
"Must provide existing empty output directory: %s",
_outputSegmentsDir.getAbsolutePath());
_segmentProcessorConfig = segmentProcessorConfig;
- _pinotSchema = segmentProcessorConfig.getSchema();
_tableConfig = segmentProcessorConfig.getTableConfig();
+ _schema = segmentProcessorConfig.getSchema();
_baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" +
System.currentTimeMillis());
FileUtils.deleteQuietly(_baseDir);
@@ -105,58 +110,58 @@ public class SegmentProcessorFramework {
*/
public void processSegments()
throws Exception {
-
// Check for input segments
File[] segmentFiles = _inputSegmentsDir.listFiles();
- if (segmentFiles.length == 0) {
- throw new IllegalStateException("No segments found in input dir: " +
_inputSegmentsDir.getAbsolutePath()
- + ". Exiting SegmentProcessorFramework.");
- }
+ Preconditions
+ .checkState(segmentFiles != null && segmentFiles.length > 0, "Failed
to find segments under input dir: %s",
+ _inputSegmentsDir.getAbsolutePath());
// Mapper phase.
LOGGER.info("Beginning mapper phase. Processing segments: {}",
Arrays.toString(_inputSegmentsDir.list()));
- for (File segment : segmentFiles) {
-
- String fileName = segment.getName();
- File mapperInput = segment;
+ SegmentMapperConfig mapperConfig =
+ new SegmentMapperConfig(_tableConfig, _schema,
_segmentProcessorConfig.getRecordTransformerConfig(),
+ _segmentProcessorConfig.getRecordFilterConfig(),
_segmentProcessorConfig.getPartitionerConfigs());
+ List<RecordReader> recordReaders = new ArrayList<>(segmentFiles.length);
+ for (File indexDir : segmentFiles) {
+ String fileName = indexDir.getName();
// Untar the segments if needed
- if (!segment.isDirectory()) {
+ if (!indexDir.isDirectory()) {
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
- mapperInput = TarGzCompressionUtils.untar(segment,
_mapperInputDir).get(0);
+ indexDir = TarGzCompressionUtils.untar(indexDir,
_mapperInputDir).get(0);
} else {
- throw new IllegalStateException("Unsupported segment format: " +
segment.getAbsolutePath());
+ throw new IllegalStateException("Unsupported segment format: " +
indexDir.getAbsolutePath());
}
}
- // Set mapperId as the name of the segment
- SegmentMapperConfig mapperConfig =
- new SegmentMapperConfig(_pinotSchema,
_segmentProcessorConfig.getRecordTransformerConfig(),
- _segmentProcessorConfig.getRecordFilterConfig(),
_segmentProcessorConfig.getPartitionerConfigs());
- SegmentMapper mapper = new SegmentMapper(mapperInput.getName(),
mapperInput, mapperConfig, _mapperOutputDir);
- mapper.map();
- mapper.cleanup();
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ // NOTE: Do not fill null field with default value to be consistent with
other record readers
+ recordReader.init(indexDir, null, null, true);
+ recordReaders.add(recordReader);
+ }
+ SegmentMapper mapper = new SegmentMapper(recordReaders, mapperConfig,
_mapperOutputDir);
+ Map<String, GenericRowFileManager> partitionToFileManagerMap =
mapper.map();
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.close();
}
// Check for mapper output files
- File[] mapperOutputFiles = _mapperOutputDir.listFiles();
- if (mapperOutputFiles.length == 0) {
- throw new IllegalStateException("No files found in mapper output
directory: " + _mapperOutputDir.getAbsolutePath()
- + ". Exiting SegmentProcessorFramework.");
- }
+ int numPartitions = partitionToFileManagerMap.size();
+ Preconditions.checkState(numPartitions > 0, "No partition generated from
mapper phase");
// Reducer phase.
- LOGGER.info("Beginning reducer phase. Processing files: {}",
Arrays.toString(_mapperOutputDir.list()));
- // Mapper output directory has 1 directory per partition, named after the
partition. Each directory contains 1 or more avro files.
- for (File partDir : mapperOutputFiles) {
-
+ LOGGER.info("Beginning reducer phase on partitions: {}",
partitionToFileManagerMap.keySet());
+ for (Map.Entry<String, GenericRowFileManager> entry :
partitionToFileManagerMap.entrySet()) {
+ String partition = entry.getKey();
+ GenericRowFileManager fileManager = entry.getValue();
// Set partition as reducerId
SegmentReducerConfig reducerConfig =
- new SegmentReducerConfig(_pinotSchema,
_segmentProcessorConfig.getCollectorConfig(),
+ new SegmentReducerConfig(_schema,
_segmentProcessorConfig.getCollectorConfig(),
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
- SegmentReducer reducer = new SegmentReducer(partDir.getName(), partDir,
reducerConfig, _reducerOutputDir);
+ SegmentReducer reducer = new SegmentReducer(partition, fileManager,
reducerConfig, _reducerOutputDir);
reducer.reduce();
reducer.cleanup();
+ fileManager.cleanUp();
}
// Check for reducer output files
@@ -172,12 +177,12 @@ public class SegmentProcessorFramework {
// Reducer output directory will have 1 or more avro files
int segmentNum = 0;
for (File resultFile : reducerOutputFiles) {
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(_tableConfig, _pinotSchema);
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(_tableConfig, _schema);
segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
segmentGeneratorConfig.setOutDir(_outputSegmentsDir.getAbsolutePath());
segmentGeneratorConfig.setInputFilePath(resultFile.getAbsolutePath());
segmentGeneratorConfig.setFormat(FileFormat.AVRO);
- segmentGeneratorConfig.setSequenceId(segmentNum ++);
+ segmentGeneratorConfig.setSequenceId(segmentNum++);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig);
driver.build();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
index 4bcd373..26a2be7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
@@ -26,11 +26,11 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.core.segment.processing.collector.Collector;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,27 +48,27 @@ public class SegmentReducer {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentReducer.class);
- private final File _reducerInputDir;
+ private final String _reducerId;
+ private final GenericRowFileManager _fileManager;
private final File _reducerOutputDir;
- private final String _reducerId;
private final Schema _pinotSchema;
private final org.apache.avro.Schema _avroSchema;
private final Collector _collector;
private final int _numRecordsPerPart;
- public SegmentReducer(String reducerId, File reducerInputDir,
SegmentReducerConfig reducerConfig,
+ public SegmentReducer(String reducerId, GenericRowFileManager fileManager,
SegmentReducerConfig reducerConfig,
File reducerOutputDir) {
- _reducerInputDir = reducerInputDir;
+ _reducerId = reducerId;
+ _fileManager = fileManager;
_reducerOutputDir = reducerOutputDir;
- _reducerId = reducerId;
_pinotSchema = reducerConfig.getPinotSchema();
_avroSchema =
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
_collector =
CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
_numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
- LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir:
{}, collector: {}, numRecordsPerPart: {}",
- _reducerId, _reducerInputDir, _reducerOutputDir,
_collector.getClass(), _numRecordsPerPart);
+ LOGGER.info("Initialized reducer with id: {}, output dir: {}, collector:
{}, numRecordsPerPart: {}", _reducerId,
+ _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
}
/**
@@ -77,27 +77,22 @@ public class SegmentReducer {
*/
public void reduce()
throws Exception {
-
int part = 0;
- for (File inputFile : _reducerInputDir.listFiles()) {
-
- RecordReader avroRecordReader = RecordReaderFactory
-
.getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader",
inputFile,
- _pinotSchema.getColumnNames(), null);
-
- while (avroRecordReader.hasNext()) {
- GenericRow next = avroRecordReader.next();
-
- // Aggregations
- _collector.collect(next);
-
- // Reached max records per part file. Flush
- if (_collector.size() == _numRecordsPerPart) {
- flushRecords(_collector, createReducerOutputFileName(_reducerId,
part++));
- _collector.reset();
- }
+ GenericRowFileReader fileReader = _fileManager.getFileReader();
+ int numRows = fileReader.getNumRows();
+ for (int i = 0; i < numRows; i++) {
+ GenericRow next = fileReader.read(i, new GenericRow());
+
+ // Aggregations
+ _collector.collect(next);
+
+ // Reached max records per part file. Flush
+ if (_collector.size() == _numRecordsPerPart) {
+ flushRecords(_collector, createReducerOutputFileName(_reducerId,
part++));
+ _collector.reset();
}
}
+ _fileManager.closeFileReader();
if (_collector.size() > 0) {
flushRecords(_collector, createReducerOutputFileName(_reducerId, part));
_collector.reset();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
new file mode 100644
index 0000000..9fd9401
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Manager for generic row files.
+ */
+public class GenericRowFileManager {
+ public static final String OFFSET_FILE_NAME = "record.offset";
+ public static final String DATA_FILE_NAME = "record.data";
+
+ private final File _offsetFile;
+ private final File _dataFile;
+ private final List<FieldSpec> _fieldSpecs;
+ private final boolean _includeNullFields;
+
+ private GenericRowFileWriter _fileWriter;
+ private GenericRowFileReader _fileReader;
+
+ public GenericRowFileManager(File outputDir, List<FieldSpec> fieldSpecs,
boolean includeNullFields) {
+ _offsetFile = new File(outputDir, OFFSET_FILE_NAME);
+ _dataFile = new File(outputDir, DATA_FILE_NAME);
+ _fieldSpecs = fieldSpecs;
+ _includeNullFields = includeNullFields;
+ }
+
+ /**
+ * Returns the file writer. Creates one if not exists.
+ */
+ public GenericRowFileWriter getFileWriter()
+ throws IOException {
+ if (_fileWriter == null) {
+ Preconditions.checkState(!_offsetFile.exists(), "Record offset file: %s
already exists", _offsetFile);
+ Preconditions.checkState(!_dataFile.exists(), "Record data file: %s
already exists", _dataFile);
+ _fileWriter = new GenericRowFileWriter(_offsetFile, _dataFile,
_fieldSpecs, _includeNullFields);
+ }
+ return _fileWriter;
+ }
+
+ /**
+ * Closes the file writer.
+ */
+ public void closeFileWriter()
+ throws IOException {
+ if (_fileWriter != null) {
+ _fileWriter.close();
+ _fileWriter = null;
+ }
+ }
+
+ /**
+ * Returns the file reader. Creates one if not exists.
+ */
+ public GenericRowFileReader getFileReader()
+ throws IOException {
+ if (_fileReader == null) {
+ Preconditions.checkState(_offsetFile.exists(), "Record offset file: %s
does not exist", _offsetFile);
+ Preconditions.checkState(_dataFile.exists(), "Record data file: %s does
not exist", _dataFile);
+ _fileReader = new GenericRowFileReader(_offsetFile, _dataFile,
_fieldSpecs, _includeNullFields);
+ }
+ return _fileReader;
+ }
+
+ /**
+ * Closes the file reader.
+ */
+ public void closeFileReader()
+ throws IOException {
+ if (_fileReader != null) {
+ _fileReader.close();
+ _fileReader = null;
+ }
+ }
+
+ /**
+ * Cleans up the files.
+ */
+ public void cleanUp()
+ throws IOException {
+ closeFileWriter();
+ closeFileReader();
+ FileUtils.deleteQuietly(_offsetFile);
+ FileUtils.deleteQuietly(_dataFile);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
index 0e1ba1b..13102aa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
@@ -60,8 +60,7 @@ public class GenericRowFileReader implements Closeable {
*/
public GenericRow read(int rowId, GenericRow reuse) {
long offset = _offsetBuffer.getLong((long) rowId << 3); // rowId *
Long.BYTES
- _deserializer.deserialize(offset, reuse);
- return reuse;
+ return _deserializer.deserialize(offset, reuse);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
new file mode 100644
index 0000000..698a105
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+
+
+public final class SegmentProcessingUtils {
+ private SegmentProcessingUtils() {
+ }
+
+ /**
+ * Returns the field specs (physical only) with the names sorted in
alphabetical order.
+ */
+ public static List<FieldSpec> getFieldSpecs(Schema schema) {
+ List<FieldSpec> fieldSpecs = new ArrayList<>();
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ if (!fieldSpec.isVirtualColumn()) {
+ fieldSpecs.add(fieldSpec);
+ }
+ }
+ fieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+ return fieldSpecs;
+ }
+
+ /**
+ * Returns the field specs (physical only) with sorted column in the front,
followed by other columns sorted in
+ * alphabetical order.
+ */
+ public static List<FieldSpec> getFieldSpecs(Schema schema, List<String>
sortOrder) {
+ List<FieldSpec> fieldSpecs = new ArrayList<>();
+ for (String sortColumn : sortOrder) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
+ Preconditions.checkArgument(fieldSpec != null, "Failed to find sort
column: %s", sortColumn);
+ Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort
on MV column: %s", sortColumn);
+ Preconditions.checkArgument(!fieldSpec.isVirtualColumn(), "Cannot sort
on virtual column: %s", sortColumn);
+ fieldSpecs.add(fieldSpec);
+ }
+
+ List<FieldSpec> nonSortFieldSpecs = new ArrayList<>();
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ if (!fieldSpec.isVirtualColumn() &&
!sortOrder.contains(fieldSpec.getName())) {
+ nonSortFieldSpecs.add(fieldSpec);
+ }
+ }
+ nonSortFieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+
+ fieldSpecs.addAll(nonSortFieldSpecs);
+ return fieldSpecs;
+ }
+
+ /**
+ * Returns the value comparator based on the sort order.
+ */
+ public static SortOrderComparator getSortOrderComparator(List<FieldSpec>
fieldSpecs, int numSortColumns) {
+ DataType[] sortColumnStoredTypes = new DataType[numSortColumns];
+ for (int i = 0; i < numSortColumns; i++) {
+ sortColumnStoredTypes[i] =
fieldSpecs.get(i).getDataType().getStoredType();
+ }
+ return new SortOrderComparator(numSortColumns, sortColumnStoredTypes);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
similarity index 97%
rename from
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
index 34b63b6..5652957 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.utils;
import java.util.Comparator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 6e01815..682cd83 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.processing.framework;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,12 +31,14 @@ import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -50,6 +54,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -57,39 +62,42 @@ import static org.testng.Assert.assertTrue;
* Tests for {@link SegmentMapper}
*/
public class SegmentMapperTest {
-
- private File _baseDir;
- private File _inputSegment;
- private Schema _pinotSchema;
- private final List<Object[]> _rawData = Lists
- .newArrayList(new Object[]{"abc", 1000, 1597719600000L}, new
Object[]{"pqr", 2000, 1597773600000L},
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"SegmentMapperTest");
+
+ private final TableConfig _tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue")
+ .setNullHandlingEnabled(true).build();
+ private final Schema _schema = new
Schema.SchemaBuilder().setSchemaName("myTable")
+ .addSingleValueDimension("campaign", FieldSpec.DataType.STRING,
"xyz").addMetric("clicks", FieldSpec.DataType.INT)
+ .addDateTime("timeValue", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+ private final List<Object[]> _rawData = Arrays
+ .asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr",
2000, 1597773600000L},
new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc", 4000,
1597795200000L},
new Object[]{"abc", 3000, 1597802400000L}, new Object[]{"pqr", 1000,
1597838400000L},
- new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{"pqr", 1000,
1597878000000L},
- new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz", 6000,
1597892400000L});
+ new Object[]{null, 4000, 1597856400000L}, new Object[]{"pqr", 1000,
1597878000000L},
+ new Object[]{"abc", 7000, 1597881600000L}, new Object[]{null, 6000,
1597892400000L});
+
+ private File _indexDir;
@BeforeClass
- public void before()
+ public void setUp()
throws Exception {
- _baseDir = new File(FileUtils.getTempDirectory(), "segment_mapper_test_" +
System.currentTimeMillis());
- FileUtils.deleteQuietly(_baseDir);
- assertTrue(_baseDir.mkdirs());
+ FileUtils.deleteQuietly(TEMP_DIR);
+ assertTrue(TEMP_DIR.mkdirs());
// Segment directory
- File inputSegmentDir = new File(_baseDir, "input_segment");
+ File inputSegmentDir = new File(TEMP_DIR, "input_segment");
assertTrue(inputSegmentDir.mkdirs());
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue").build();
- _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
- .addSingleValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-
// Create test data
List<GenericRow> inputRows = new ArrayList<>();
for (Object[] rawRow : _rawData) {
GenericRow row = new GenericRow();
- row.putValue("campaign", rawRow[0]);
+ if (rawRow[0] != null) {
+ row.putValue("campaign", rawRow[0]);
+ } else {
+ row.putDefaultNullValue("campaign", "xyz");
+ }
row.putValue("clicks", rawRow[1]);
row.putValue("timeValue", rawRow[2]);
inputRows.add(row);
@@ -97,57 +105,67 @@ public class SegmentMapperTest {
// Create test segment
RecordReader recordReader = new GenericRowRecordReader(inputRows);
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, _pinotSchema);
- segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(_tableConfig, _schema);
segmentGeneratorConfig.setOutDir(inputSegmentDir.getAbsolutePath());
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, recordReader);
driver.build();
- assertEquals(inputSegmentDir.listFiles().length, 1);
- _inputSegment = inputSegmentDir.listFiles()[0];
+ File[] segmentFiles = inputSegmentDir.listFiles();
+ assertTrue(segmentFiles != null && segmentFiles.length == 1);
+ _indexDir = segmentFiles[0];
}
@Test(dataProvider = "segmentMapperConfigProvider")
- public void segmentMapperTest(String mapperId, SegmentMapperConfig
segmentMapperConfig,
- Map<String, List<Object[]>> partitionToRecords)
+ public void segmentMapperTest(SegmentMapperConfig segmentMapperConfig,
Map<String, List<Object[]>> partitionToRecords)
throws Exception {
-
- File mapperOutputDir = new File(_baseDir, "mapper_output");
+ File mapperOutputDir = new File(TEMP_DIR, "mapper_output");
FileUtils.deleteQuietly(mapperOutputDir);
assertTrue(mapperOutputDir.mkdirs());
- SegmentMapper segmentMapper = new SegmentMapper(mapperId, _inputSegment,
segmentMapperConfig, mapperOutputDir);
- segmentMapper.map();
- segmentMapper.cleanup();
-
- File[] partitionDirs = mapperOutputDir.listFiles();
- // as many directories in output as num partitions created + passed filter
- assertEquals(partitionDirs.length, partitionToRecords.size());
- for (File partitionDir : partitionDirs) {
- String partition = partitionDir.getName();
- // directory named after every partition
- assertTrue(partitionToRecords.containsKey(partition));
- // each partition directory has as many files as mapper
- File[] avroFiles = partitionDir.listFiles();
- assertEquals(avroFiles.length, 1);
- assertEquals(avroFiles[0].getName(),
SegmentMapper.createMapperOutputFileName(mapperId));
-
- RecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(avroFiles[0], _pinotSchema.getColumnNames(), null);
- int numRecords = 0;
+
+ PinotSegmentRecordReader segmentRecordReader = new
PinotSegmentRecordReader();
+ segmentRecordReader.init(_indexDir, null, null, true);
+ SegmentMapper segmentMapper =
+ new SegmentMapper(Collections.singletonList(segmentRecordReader),
segmentMapperConfig, mapperOutputDir);
+ Map<String, GenericRowFileManager> partitionToFileManagerMap =
segmentMapper.map();
+ segmentRecordReader.close();
+
+ assertEquals(partitionToFileManagerMap.size(), partitionToRecords.size());
+ for (Map.Entry<String, GenericRowFileManager> entry :
partitionToFileManagerMap.entrySet()) {
+ // Directory named after every partition
+ String partition = entry.getKey();
+ File partitionDir = new File(mapperOutputDir, partition);
+ assertTrue(partitionDir.isDirectory());
+
+ // Each partition directory should contain 2 files (offset & data)
+ String[] fileNames = partitionDir.list();
+ assertNotNull(fileNames);
+ assertEquals(fileNames.length, 2);
+ Arrays.sort(fileNames);
+ assertEquals(fileNames[0], GenericRowFileManager.DATA_FILE_NAME);
+ assertEquals(fileNames[1], GenericRowFileManager.OFFSET_FILE_NAME);
+
+ GenericRowFileManager fileManager = entry.getValue();
+ GenericRowFileReader fileReader = fileManager.getFileReader();
+ int numRows = fileReader.getNumRows();
List<Object[]> expectedRecords = partitionToRecords.get(partition);
- GenericRow next = new GenericRow();
- while (avroRecordReader.hasNext()) {
- avroRecordReader.next(next);
- assertEquals(next.getValue("campaign"),
expectedRecords.get(numRecords)[0]);
- assertEquals(next.getValue("clicks"),
expectedRecords.get(numRecords)[1]);
- assertEquals(next.getValue("timeValue"),
expectedRecords.get(numRecords)[2]);
- numRecords++;
+ assertEquals(numRows, expectedRecords.size());
+ GenericRow reuse = new GenericRow();
+ for (int i = 0; i < numRows; i++) {
+ reuse = fileReader.read(i, reuse);
+ Object[] expectedValues = expectedRecords.get(i);
+ assertEquals(reuse.getValue("campaign"), expectedValues[0]);
+ assertEquals(reuse.getValue("clicks"), expectedValues[1]);
+ assertEquals(reuse.getValue("timeValue"), expectedValues[2]);
+ // Default null value
+ if (expectedValues[0].equals("xyz")) {
+ assertEquals(reuse.getNullValueFields(),
Collections.singleton("campaign"));
+ } else {
+ assertEquals(reuse.getNullValueFields(), Collections.emptySet());
+ }
}
- assertEquals(numRecords, expectedRecords.size());
+ fileManager.cleanUp();
}
-
- FileUtils.deleteQuietly(mapperOutputDir);
}
/**
@@ -155,139 +173,152 @@ public class SegmentMapperTest {
*/
@DataProvider(name = "segmentMapperConfigProvider")
public Object[][] segmentMapperConfigProvider() {
- String mapperId = "aMapperId";
- List<Object[]> outputData = new ArrayList<>();
- _rawData.forEach(r -> outputData.add(new Object[]{r[0], r[1], r[2]}));
+ List<Object[]> outputData = Arrays
+ .asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr",
2000, 1597773600000L},
+ new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc",
4000, 1597795200000L},
+ new Object[]{"abc", 3000, 1597802400000L}, new Object[]{"pqr",
1000, 1597838400000L},
+ new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{"pqr",
1000, 1597878000000L},
+ new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz",
6000, 1597892400000L});
List<Object[]> inputs = new ArrayList<>();
// default configs
- SegmentMapperConfig config1 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(new
PartitionerConfig.Builder().build()));
+ SegmentMapperConfig config1 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(new
PartitionerConfig.Builder().build()));
Map<String, List<Object[]>> expectedRecords1 = new HashMap<>();
expectedRecords1.put("0", outputData);
- inputs.add(new Object[]{mapperId, config1, expectedRecords1});
+ inputs.add(new Object[]{config1, expectedRecords1});
// round robin partitioner
- SegmentMapperConfig config12 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
- .setNumPartitions(3).build()));
+ SegmentMapperConfig config12 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
+ .setNumPartitions(3).build()));
Map<String, List<Object[]>> expectedRecords12 = new HashMap<>();
IntStream.range(0, 3).forEach(i ->
expectedRecords12.put(String.valueOf(i), new ArrayList<>()));
for (int i = 0; i < outputData.size(); i++) {
expectedRecords12.get(String.valueOf(i % 3)).add(outputData.get(i));
}
- inputs.add(new Object[]{mapperId, config12, expectedRecords12});
+ inputs.add(new Object[]{config12, expectedRecords12});
// partition by timeValue
- SegmentMapperConfig config2 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build()));
+ SegmentMapperConfig config2 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("timeValue").build()));
Map<String, List<Object[]>> expectedRecords2 =
outputData.stream().collect(Collectors.groupingBy(r ->
String.valueOf(r[2]), Collectors.toList()));
- inputs.add(new Object[]{mapperId, config2, expectedRecords2});
+ inputs.add(new Object[]{config2, expectedRecords2});
// partition by campaign
- SegmentMapperConfig config3 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("campaign").build()));
+ SegmentMapperConfig config3 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("campaign").build()));
Map<String, List<Object[]>> expectedRecords3 =
outputData.stream().collect(Collectors.groupingBy(r ->
String.valueOf(r[0]), Collectors.toList()));
- inputs.add(new Object[]{mapperId, config3, expectedRecords3});
+ inputs.add(new Object[]{config3, expectedRecords3});
// transform function partition
- SegmentMapperConfig config4 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
- .setTransformFunction("toEpochDays(timeValue)").build()));
+ SegmentMapperConfig config4 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
+ .setTransformFunction("toEpochDays(timeValue)").build()));
Map<String, List<Object[]>> expectedRecords4 = outputData.stream()
.collect(Collectors.groupingBy(r -> String.valueOf(((long) r[2]) /
86400000), Collectors.toList()));
- inputs.add(new Object[]{mapperId, config4, expectedRecords4});
+ inputs.add(new Object[]{config4, expectedRecords4});
// partition by column and then table column partition config
- SegmentMapperConfig config41 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("campaign").build(),
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
- .setColumnName("clicks").setColumnPartitionConfig(new
ColumnPartitionConfig("Modulo", 3)).build()));
+ SegmentMapperConfig config41 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("campaign").build(), new
PartitionerConfig.Builder()
+
.setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName("clicks")
+ .setColumnPartitionConfig(new ColumnPartitionConfig("Modulo",
3)).build()));
Map<String, List<Object[]>> expectedRecords41 = new HashMap<>();
for (Object[] record : outputData) {
String partition = record[0] + "_" + (int) record[1] % 3;
List<Object[]> objects = expectedRecords41.computeIfAbsent(partition, k
-> new ArrayList<>());
objects.add(record);
}
- inputs.add(new Object[]{mapperId, config41, expectedRecords41});
+ inputs.add(new Object[]{config41, expectedRecords41});
// filter function which filters out nothing
- SegmentMapperConfig config5 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({campaign == \"foo\"},
campaign)").build(),
- Lists.newArrayList(new PartitionerConfig.Builder().build()));
+ SegmentMapperConfig config5 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({campaign == \"foo\"},
campaign)").build(),
+ Lists.newArrayList(new PartitionerConfig.Builder().build()));
Map<String, List<Object[]>> expectedRecords5 = new HashMap<>();
expectedRecords5.put("0", outputData);
- inputs.add(new Object[]{mapperId, config5, expectedRecords5});
+ inputs.add(new Object[]{config5, expectedRecords5});
// filter function which filters out everything
- SegmentMapperConfig config6 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue > 0}, timeValue)").build(),
- Lists.newArrayList(new PartitionerConfig.Builder().build()));
+ SegmentMapperConfig config6 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({timeValue > 0},
timeValue)").build(),
+ Lists.newArrayList(new PartitionerConfig.Builder().build()));
Map<String, List<Object[]>> expectedRecords6 = new HashMap<>();
- inputs.add(new Object[]{mapperId, config6, expectedRecords6});
+ inputs.add(new Object[]{config6, expectedRecords6});
// filter function which filters out certain times
- SegmentMapperConfig config7 = new SegmentMapperConfig(_pinotSchema, new
RecordTransformerConfig.Builder().build(),
- new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue < 1597795200000L ||
timeValue >= 1597881600000L}, timeValue)")
- .build(), Lists.newArrayList(new
PartitionerConfig.Builder().build()));
+ SegmentMapperConfig config7 =
+ new SegmentMapperConfig(_tableConfig, _schema, new
RecordTransformerConfig.Builder().build(),
+ new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({timeValue < 1597795200000L ||
timeValue >= 1597881600000L}, timeValue)")
+ .build(), Lists.newArrayList(new
PartitionerConfig.Builder().build()));
Map<String, List<Object[]>> expectedRecords7 =
outputData.stream().filter(r -> ((long) r[2]) >= 1597795200000L &&
((long) r[2]) < 1597881600000L)
.collect(Collectors.groupingBy(r -> "0", Collectors.toList()));
- inputs.add(new Object[]{mapperId, config7, expectedRecords7});
+ inputs.add(new Object[]{config7, expectedRecords7});
// record transformation - round timeValue to nearest day
Map<String, String> transformFunctionMap = new HashMap<>();
transformFunctionMap.put("timeValue", "round(timeValue, 86400000)");
- SegmentMapperConfig config9 = new SegmentMapperConfig(_pinotSchema,
+ SegmentMapperConfig config9 = new SegmentMapperConfig(_tableConfig,
_schema,
new
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
new RecordFilterConfig.Builder().build(), Lists.newArrayList(new
PartitionerConfig.Builder().build()));
List<Object[]> transformedData = new ArrayList<>();
outputData.forEach(r -> transformedData.add(new Object[]{r[0], r[1],
(((long) r[2]) / 86400000) * 86400000}));
Map<String, List<Object[]>> expectedRecords9 = new HashMap<>();
expectedRecords9.put("0", transformedData);
- inputs.add(new Object[]{mapperId, config9, expectedRecords9});
+ inputs.add(new Object[]{config9, expectedRecords9});
// record transformation - round timeValue to nearest day, partition on
timeValue
- SegmentMapperConfig config10 = new SegmentMapperConfig(_pinotSchema,
+ SegmentMapperConfig config10 = new SegmentMapperConfig(_tableConfig,
_schema,
new
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
new RecordFilterConfig.Builder().build(), Lists.newArrayList(
new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
.setColumnName("timeValue").build()));
Map<String, List<Object[]>> expectedRecords10 =
transformedData.stream().collect(Collectors.groupingBy(r ->
String.valueOf(r[2]), Collectors.toList()));
- inputs.add(new Object[]{mapperId, config10, expectedRecords10});
+ inputs.add(new Object[]{config10, expectedRecords10});
// record transformation - round timeValue to nearest day, partition on
timeValue, filter out timeValues
- SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
+ SegmentMapperConfig config11 = new SegmentMapperConfig(_tableConfig,
_schema,
new
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue
>= 1597881600000}, timeValue)").build(), Lists.newArrayList(
- new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build()));
+ .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue
>= 1597881600000}, timeValue)").build(),
+ Lists.newArrayList(
+ new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("timeValue").build()));
Map<String, List<Object[]>> expectedRecords11 =
transformedData.stream().filter(r -> ((long) r[2]) == 1597795200000L)
.collect(Collectors.groupingBy(r -> "1597795200000",
Collectors.toList()));
- inputs.add(new Object[]{mapperId, config11, expectedRecords11});
+ inputs.add(new Object[]{config11, expectedRecords11});
return inputs.toArray(new Object[0][]);
}
@AfterClass
- public void after() {
- FileUtils.deleteQuietly(_baseDir);
+ public void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
index 26ae032..f393f5e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
@@ -23,20 +23,20 @@ import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
import
org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -55,60 +55,47 @@ import static org.testng.Assert.assertTrue;
* Tests for {@link SegmentReducer}
*/
public class SegmentReducerTest {
-
- private File _baseDir;
- private File _partDir;
- private Schema _pinotSchema;
- private final List<Object[]> _rawData1597795200000L = Lists
- .newArrayList(new Object[]{"abc", 4000, 1597795200000L}, new
Object[]{"abc", 3000, 1597795200000L},
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"SegmentReducerTest");
+
+ private final File _mapperOutputDir = new File(TEMP_DIR,
"mapper_output/1597795200000");
+ private final Schema _pinotSchema = new
Schema.SchemaBuilder().setSchemaName("mySchema")
+ .addSingleValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
+ .addDateTime("timeValue", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+ private final List<Object[]> _rawData1597795200000L = Arrays
+ .asList(new Object[]{"abc", 4000, 1597795200000L}, new Object[]{"abc",
3000, 1597795200000L},
new Object[]{"pqr", 1000, 1597795200000L}, new Object[]{"xyz", 4000,
1597795200000L},
new Object[]{"pqr", 1000, 1597795200000L});
+ private GenericRowFileManager _fileManager;
+
@BeforeClass
- public void before()
+ public void setUp()
throws IOException {
- _baseDir = new File(FileUtils.getTempDirectory(), "segment_reducer_test_"
+ System.currentTimeMillis());
- FileUtils.deleteQuietly(_baseDir);
- assertTrue(_baseDir.mkdirs());
-
- // mapper output directory/partition directory
- _partDir = new File(_baseDir, "mapper_output/1597795200000");
- assertTrue(_partDir.mkdirs());
-
- _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
- .addSingleValueDimension("campaign",
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- org.apache.avro.Schema avroSchema =
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
-
- // create 2 avro files
- DataFileWriter<GenericData.Record> recordWriter1 = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
- recordWriter1.create(avroSchema, new File(_partDir, "map1.avro"));
- DataFileWriter<GenericData.Record> recordWriter2 = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
- recordWriter2.create(avroSchema, new File(_partDir, "map2.avro"));
+ FileUtils.deleteQuietly(TEMP_DIR);
+ assertTrue(_mapperOutputDir.mkdirs());
+
+ List<FieldSpec> fieldSpecs =
SegmentProcessingUtils.getFieldSpecs(_pinotSchema);
+ _fileManager = new GenericRowFileManager(_mapperOutputDir, fieldSpecs,
false);
+ GenericRowFileWriter fileWriter = _fileManager.getFileWriter();
+ GenericRow reuse = new GenericRow();
for (int i = 0; i < 5; i++) {
- GenericData.Record record = new GenericData.Record(avroSchema);
- record.put("campaign", _rawData1597795200000L.get(i)[0]);
- record.put("clicks", _rawData1597795200000L.get(i)[1]);
- record.put("timeValue", _rawData1597795200000L.get(i)[2]);
- if (i < 2) {
- recordWriter1.append(record);
- } else {
- recordWriter2.append(record);
- }
+ reuse.putValue("campaign", _rawData1597795200000L.get(i)[0]);
+ reuse.putValue("clicks", _rawData1597795200000L.get(i)[1]);
+ reuse.putValue("timeValue", _rawData1597795200000L.get(i)[2]);
+ fileWriter.write(reuse);
+ reuse.clear();
}
- recordWriter1.close();
- recordWriter2.close();
+ _fileManager.closeFileWriter();
}
@Test(dataProvider = "segmentReducerDataProvider")
public void segmentReducerTest(String reducerId, SegmentReducerConfig
reducerConfig, Set<String> expectedFileNames,
List<Object[]> expectedRecords, Comparator comparator)
throws Exception {
-
- File reducerOutputDir = new File(_baseDir, "reducer_output");
+ File reducerOutputDir = new File(TEMP_DIR, "reducer_output");
FileUtils.deleteQuietly(reducerOutputDir);
assertTrue(reducerOutputDir.mkdirs());
- SegmentReducer segmentReducer = new SegmentReducer(reducerId, _partDir,
reducerConfig, reducerOutputDir);
+ SegmentReducer segmentReducer = new SegmentReducer(reducerId,
_fileManager, reducerConfig, reducerOutputDir);
segmentReducer.reduce();
segmentReducer.cleanup();
@@ -210,7 +197,9 @@ public class SegmentReducerTest {
}
@AfterClass
- public void after() {
- FileUtils.deleteQuietly(_baseDir);
+ public void tearDown()
+ throws IOException {
+ _fileManager.cleanUp();
+ FileUtils.deleteQuietly(TEMP_DIR);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]