This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 91ffcc759f Reduce Disk Footprint for Segment Processor Framework to
Avoid Out of Disk Issues (#12220)
91ffcc759f is described below
commit 91ffcc759f4f178cbfb6b9a1541069734b4c0031
Author: aishikbh <[email protected]>
AuthorDate: Wed Jan 24 11:17:29 2024 +0530
Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk
Issues (#12220)
* modularized map, reduce and segment generation phase.
* added interfaces and classes
* added staetefulrecordreaderfileconfig instead of recordreaderfileconfig
* save Progress
* major changes, working tests.
* more changes
* made the number of bytes to be configurable
* added global sequence id to segments since it is modularised out, made
the intermediate file size configurable through segmentsConfig.
* remove logic for global segment IDs.
* used recordreaderconfig instead of statefulrecordreaderconfig, changed
looping conditions, made recordreader of recordreaderfileconfig modifiable.
* added proper sequence ids for segments.
* ingestion working without errors. Should be good in sunny day scenarios.
* remove typo from testing
* replace _partitionToFilemanagerMap with local variable asthe mapping is
happening in multiple iterations. Change exception handling for mapper.
* added precondition check for intermediateFileSizeThresholdInBytes and
inferred observer from segmentsConfig
* fix typo and add comment.
* removed redundant method.
* config related changes, fix unit tests.
* remove redundant flag.
* replaced size based constraint checker with size based constraint writer.
* consolidated all the constraint checks into one place.
* add test to validate segmentprocessorframework
* decoupled recordreader closing logic from segment mapper
* addressing comments related to segmentprocessorframework.
* delegated initialisation and check for recordreader being done with all
the records to recordreaderfileconfig.
* simplify logic for termination of map phase.
* change variable names for better readability.
* Added tests and minor changes in logic
- Added further tests for SegmentProcessorFramework.
- Removed redundant checks for SegmentProcessorFramework.
- Detected an edge case and added an additional statement in
SegmentMapper.
* isolated the termination logs to mapAndTransformRow.
* Keep SegmentMapper public interface unchanged
- Pass the total count of RecordReaders through SegmentMapper
Constructor instead of map() arguments.
* addressing review comments.
* cleaned up logging logic.
* Channged Logs
* kept the logging for default scenario same as before
* conditionally logged the progress between iterations in case the
feature is enabled.
* modify tests
- PR #12290 touched dimBaseballTeams.csv on which the tests depends on.
Modified the test to reflect that.
- Changed import statements (addressed comment).
---
.../apache/pinot/core/common/MinionConstants.java | 1 +
.../processing/framework/SegmentConfig.java | 30 ++-
.../framework/SegmentProcessorFramework.java | 139 ++++++++++----
.../genericrow/AdaptiveConstraintsWriter.java | 33 ++++
.../genericrow/AdaptiveSizeBasedWriter.java | 51 ++++++
.../segment/processing/genericrow/FileWriter.java | 30 +++
.../genericrow/GenericRowFileWriter.java | 11 +-
.../segment/processing/mapper/SegmentMapper.java | 70 ++++---
.../framework/SegmentProcessorFrameworkTest.java | 203 +++++++++++++++++++++
.../pinot/plugin/minion/tasks/MergeTaskUtils.java | 4 +
.../plugin/minion/tasks/MergeTaskUtilsTest.java | 6 +-
.../spi/data/readers/RecordReaderFileConfig.java | 51 +++++-
12 files changed, 561 insertions(+), 68 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8e4afd465a..4193984559 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -104,6 +104,7 @@ public class MinionConstants {
// Segment config
public static final String MAX_NUM_RECORDS_PER_TASK_KEY =
"maxNumRecordsPerTask";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY =
"maxNumRecordsPerSegment";
+ public static final String SEGMENT_MAPPER_FILE_SIZE_IN_BYTES =
"segmentMapperFileSizeThresholdInBytes";
public static final String MAX_NUM_PARALLEL_BUCKETS =
"maxNumParallelBuckets";
public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
public static final String SEGMENT_NAME_POSTFIX_KEY = "segmentNamePostfix";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
index b3302fafc0..95191b658b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
@@ -31,22 +31,28 @@ import javax.annotation.Nullable;
@JsonIgnoreProperties(ignoreUnknown = true)
public class SegmentConfig {
public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+ public static final long DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES =
Long.MAX_VALUE;
private final int _maxNumRecordsPerSegment;
private final String _segmentNamePrefix;
private final String _segmentNamePostfix;
private final String _fixedSegmentName;
+ private final long _segmentMapperFileSizeThresholdInBytes;
@JsonCreator
private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment",
required = true) int maxNumRecordsPerSegment,
@JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix,
@JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix,
- @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName) {
+ @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName,
+ @JsonProperty(value = "segmentMapperFileSizeThresholdInBytes", required
= true)
+ long segmentMapperFileSizeThresholdInBytes) {
Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per
segment must be > 0");
+ Preconditions.checkState(segmentMapperFileSizeThresholdInBytes > 0,
"Intermediate file size threshold must be > 0");
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
_segmentNamePrefix = segmentNamePrefix;
_segmentNamePostfix = segmentNamePostfix;
_fixedSegmentName = fixedSegmentName;
+ _segmentMapperFileSizeThresholdInBytes =
segmentMapperFileSizeThresholdInBytes;
}
/**
@@ -71,15 +77,21 @@ public class SegmentConfig {
return _fixedSegmentName;
}
+ public long getIntermediateFileSizeThreshold() {
+ return _segmentMapperFileSizeThresholdInBytes;
+ }
+
/**
* Builder for SegmentConfig
*/
public static class Builder {
private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT;
+ private long _segmentMapperFileSizeThresholdInBytes =
DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES;
private String _segmentNamePrefix;
private String _segmentNamePostfix;
private String _fixedSegmentName;
+
public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
return this;
@@ -99,17 +111,25 @@ public class SegmentConfig {
_fixedSegmentName = fixedSegmentName;
return this;
}
+ public Builder setIntermediateFileSizeThreshold(long
segmentMapperFileSizeThresholdInBytes) {
+ _segmentMapperFileSizeThresholdInBytes =
segmentMapperFileSizeThresholdInBytes;
+ return this;
+ }
public SegmentConfig build() {
Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records
per segment must be > 0");
- return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix,
_segmentNamePostfix, _fixedSegmentName);
+ Preconditions.checkState(_segmentMapperFileSizeThresholdInBytes > 0,
+ "Intermediate file size threshold must be > 0");
+ return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix,
_segmentNamePostfix, _fixedSegmentName,
+ _segmentMapperFileSizeThresholdInBytes);
}
}
@Override
public String toString() {
- return "SegmentConfig{" + "_maxNumRecordsPerSegment=" +
_maxNumRecordsPerSegment + ", _segmentNamePrefix='"
- + _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" +
_segmentNamePostfix + '\'' + ", _fixedSegmentName='"
- + _fixedSegmentName + '\'' + '}';
+ return "SegmentConfig{" + "_maxNumRecordsPerSegment=" +
_maxNumRecordsPerSegment
+ + ", _segmentMapperFileSizeThresholdInBytes=" +
_segmentMapperFileSizeThresholdInBytes
+ + ", _segmentNamePrefix='" + _segmentNamePrefix + '\'' + ",
_segmentNamePostfix='" + _segmentNamePostfix + '\''
+ + ", _fixedSegmentName='" + _fixedSegmentName + '\'' + '}';
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index fb5a08ff56..868583d0d3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -66,8 +66,8 @@ public class SegmentProcessorFramework {
private final File _mapperOutputDir;
private final File _reducerOutputDir;
private final File _segmentsOutputDir;
- private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
private final SegmentNumRowProvider _segmentNumRowProvider;
+ private int _segmentSequenceId = 0;
/**
* Initializes the SegmentProcessorFramework with record readers, config and
working directory. We will now rely on
@@ -124,14 +124,8 @@ public class SegmentProcessorFramework {
try {
return doProcess();
} catch (Exception e) {
- // Cleaning up file managers no matter they are from map phase or reduce
phase. For those from reduce phase, the
- // reducers should have cleaned up the corresponding file managers from
map phase already.
- if (_partitionToFileManagerMap != null) {
- for (GenericRowFileManager fileManager :
_partitionToFileManagerMap.values()) {
- fileManager.cleanUp();
- }
- }
- // Cleaning up output dir as processing has failed.
+ // Cleaning up output dir as processing has failed. file managers left
from map or reduce phase will be cleaned
+ // up in the respective phases.
FileUtils.deleteQuietly(_segmentsOutputDir);
throw e;
} finally {
@@ -142,24 +136,103 @@ public class SegmentProcessorFramework {
private List<File> doProcess()
throws Exception {
- // Map phase
- LOGGER.info("Beginning map phase on {} record readers",
_recordReaderFileConfigs.size());
- SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs,
_customRecordTransformers,
- _segmentProcessorConfig, _mapperOutputDir);
- _partitionToFileManagerMap = mapper.map();
-
- // Check for mapper output files
- if (_partitionToFileManagerMap.isEmpty()) {
- LOGGER.info("No partition generated from mapper phase, skipping the
reducer phase");
- return Collections.emptyList();
+ List<File> outputSegmentDirs = new ArrayList<>();
+ int numRecordReaders = _recordReaderFileConfigs.size();
+ int nextRecordReaderIndexToBeProcessed = 0;
+ int iterationCount = 1;
+ Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
+ boolean isMapperOutputSizeThresholdEnabled =
+
_segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold()
!= Long.MAX_VALUE;
+
+ while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+ // Initialise the mapper. Eliminate the record readers that have been
processed in the previous iterations.
+ SegmentMapper mapper =
+ new
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
numRecordReaders),
+ _customRecordTransformers, _segmentProcessorConfig,
_mapperOutputDir);
+
+ // Log start of iteration details only if intermediate file size
threshold is set.
+ if (isMapperOutputSizeThresholdEnabled) {
+ String logMessage =
+ String.format("Starting iteration %d with %d record readers.
Starting index = %d, end index = %d",
+ iterationCount,
+
_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
numRecordReaders).size(),
+ nextRecordReaderIndexToBeProcessed + 1, numRecordReaders);
+ LOGGER.info(logMessage);
+ observer.accept(logMessage);
+ }
+
+ // Map phase.
+ long mapStartTimeInMs = System.currentTimeMillis();
+ Map<String, GenericRowFileManager> partitionToFileManagerMap =
mapper.map();
+
+ // Log the time taken to map.
+ LOGGER.info("Finished iteration {} in {}ms", iterationCount,
System.currentTimeMillis() - mapStartTimeInMs);
+
+ // Check for mapper output files, if no files are generated, skip the
reducer phase and move on to the next
+ // iteration.
+ if (partitionToFileManagerMap.isEmpty()) {
+ LOGGER.info("No mapper output files generated, skipping reduce phase");
+ nextRecordReaderIndexToBeProcessed =
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+ continue;
+ }
+
+ // Reduce phase.
+ doReduce(partitionToFileManagerMap);
+
+ // Segment creation phase. Add the created segments to the final list.
+ outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+ // Store the starting index of the record readers that were processed in
this iteration for logging purposes.
+ int startingProcessedRecordReaderIndex =
nextRecordReaderIndexToBeProcessed;
+
+ // Update next record reader index to be processed.
+ nextRecordReaderIndexToBeProcessed =
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+
+ // Log the details between iteration only if intermediate file size
threshold is set.
+ if (isMapperOutputSizeThresholdEnabled) {
+ // Take care of logging the proper RecordReader index in case of the
last iteration.
+ int boundaryIndexToLog =
+ nextRecordReaderIndexToBeProcessed == numRecordReaders ?
nextRecordReaderIndexToBeProcessed
+ : nextRecordReaderIndexToBeProcessed + 1;
+
+ // We are sure that the last RecordReader is completely processed in
the last iteration else it may or may not
+ // have completed processing. Log it accordingly.
+ String logMessage;
+ if (nextRecordReaderIndexToBeProcessed == numRecordReaders) {
+ logMessage = String.format("Finished processing all of %d
RecordReaders", numRecordReaders);
+ } else {
+ logMessage = String.format(
+ "Finished processing RecordReaders %d to %d (RecordReader %d
might be partially processed) out of %d in "
+ + "iteration %d", startingProcessedRecordReaderIndex + 1,
boundaryIndexToLog,
+ nextRecordReaderIndexToBeProcessed + 1, numRecordReaders,
iterationCount);
+ }
+
+ observer.accept(logMessage);
+ LOGGER.info(logMessage);
+ }
+
+ iterationCount++;
}
+ return outputSegmentDirs;
+ }
- // Reduce phase
- LOGGER.info("Beginning reduce phase on partitions: {}",
_partitionToFileManagerMap.keySet());
+ private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+ for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++)
{
+ RecordReaderFileConfig recordReaderFileConfig =
_recordReaderFileConfigs.get(i);
+ if (!recordReaderFileConfig.isRecordReaderDone()) {
+ return i;
+ }
+ }
+ return _recordReaderFileConfigs.size();
+ }
+
+ private void doReduce(Map<String, GenericRowFileManager>
partitionToFileManagerMap)
+ throws Exception {
+ LOGGER.info("Beginning reduce phase on partitions: {}",
partitionToFileManagerMap.keySet());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
- int totalCount = _partitionToFileManagerMap.keySet().size();
+ int totalCount = partitionToFileManagerMap.keySet().size();
int count = 1;
- for (Map.Entry<String, GenericRowFileManager> entry :
_partitionToFileManagerMap.entrySet()) {
+ for (Map.Entry<String, GenericRowFileManager> entry :
partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
observer.accept(
String.format("Doing reduce phase on data from partition: %s (%d out
of %d)", partitionId, count++,
@@ -168,9 +241,11 @@ public class SegmentProcessorFramework {
Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager,
_segmentProcessorConfig, _reducerOutputDir);
entry.setValue(reducer.reduce());
}
+ }
- // Segment creation phase
- LOGGER.info("Beginning segment creation phase on partitions: {}",
_partitionToFileManagerMap.keySet());
+ private List<File> generateSegment(Map<String, GenericRowFileManager>
partitionToFileManagerMap)
+ throws Exception {
+ LOGGER.info("Beginning segment creation phase on partitions: {}",
partitionToFileManagerMap.keySet());
List<File> outputSegmentDirs = new ArrayList<>();
TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
Schema schema = _segmentProcessorConfig.getSchema();
@@ -179,6 +254,7 @@ public class SegmentProcessorFramework {
String fixedSegmentName =
_segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
SegmentGeneratorConfig generatorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
+ Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null)
{
generatorConfig.setSegmentNameGenerator(
@@ -190,8 +266,7 @@ public class SegmentProcessorFramework {
generatorConfig.setSegmentNamePostfix(segmentNamePostfix);
}
- int sequenceId = 0;
- for (Map.Entry<String, GenericRowFileManager> entry :
_partitionToFileManagerMap.entrySet()) {
+ for (Map.Entry<String, GenericRowFileManager> entry :
partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
GenericRowFileManager fileManager = entry.getValue();
try {
@@ -202,15 +277,15 @@ public class SegmentProcessorFramework {
numSortFields);
GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
int maxNumRecordsPerSegment;
- for (int startRowId = 0; startRowId < numRows; startRowId +=
maxNumRecordsPerSegment, sequenceId++) {
+ for (int startRowId = 0; startRowId < numRows; startRowId +=
maxNumRecordsPerSegment, _segmentSequenceId++) {
maxNumRecordsPerSegment = _segmentNumRowProvider.getNumRows();
int endRowId = Math.min(startRowId + maxNumRecordsPerSegment,
numRows);
- LOGGER.info("Start creating segment of sequenceId: {} with row
range: {} to {}", sequenceId, startRowId,
- endRowId);
+ LOGGER.info("Start creating segment of sequenceId: {} with row
range: {} to {}", _segmentSequenceId,
+ startRowId, endRowId);
observer.accept(String.format(
"Creating segment of sequentId: %d with data from partition: %s
and row range: [%d, %d) out of [0, %d)",
- sequenceId, partitionId, startRowId, endRowId, numRows));
- generatorConfig.setSequenceId(sequenceId);
+ _segmentSequenceId, partitionId, startRowId, endRowId, numRows));
+ generatorConfig.setSequenceId(_segmentSequenceId);
GenericRowFileRecordReader recordReaderForRange =
recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new
RecordReaderSegmentCreationDataSource(recordReaderForRange),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
new file mode 100644
index 0000000000..d0a3daffa0
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+
+
+/**
+ * Interface for a writer which can track constraints. This will be used by
SegmentProcessorFramework.
+ * */
+
+public interface AdaptiveConstraintsWriter<W, D> {
+ boolean canWrite();
+
+ void write(W writer, D dataUnit)
+ throws IOException;
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
new file mode 100644
index 0000000000..541bd14e26
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class AdaptiveSizeBasedWriter implements
AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> {
+
+ private final long _bytesLimit;
+ private long _numBytesWritten;
+
+ public AdaptiveSizeBasedWriter(long bytesLimit) {
+ _bytesLimit = bytesLimit;
+ _numBytesWritten = 0;
+ }
+
+ public long getBytesLimit() {
+ return _bytesLimit;
+ }
+ public long getNumBytesWritten() {
+ return _numBytesWritten;
+ }
+
+ @Override
+ public boolean canWrite() {
+ return _numBytesWritten < _bytesLimit;
+ }
+
+ @Override
+ public void write(GenericRowFileWriter writer, GenericRow row) throws
IOException {
+ _numBytesWritten += writer.writeData(row);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
new file mode 100644
index 0000000000..8dd0a7d0a1
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+
+/**
+ * Abstraction for writing data units to a file.
+ * */
+
+public interface FileWriter<T> {
+ void close() throws IOException;
+ long writeData(T dataUnit) throws IOException;
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
index 171488f963..281838a0b5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
*
* TODO: Consider using ByteBuffer instead of OutputStream.
*/
-public class GenericRowFileWriter implements Closeable {
+public class GenericRowFileWriter implements Closeable, FileWriter<GenericRow>
{
private final DataOutputStream _offsetStream;
private final BufferedOutputStream _dataStream;
private final GenericRowSerializer _serializer;
@@ -63,6 +63,15 @@ public class GenericRowFileWriter implements Closeable {
_nextOffset += bytes.length;
}
+ public long writeData(GenericRow genericRow)
+ throws IOException {
+ _offsetStream.writeLong(_nextOffset);
+ byte[] bytes = _serializer.serialize(genericRow);
+ _dataStream.write(bytes);
+ _nextOffset += bytes.length;
+ return bytes.length;
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index b883d775cc..407cbd4dcd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -30,7 +30,9 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import
org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
@@ -45,7 +47,6 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
@@ -62,22 +63,20 @@ import org.slf4j.LoggerFactory;
*/
public class SegmentMapper {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentMapper.class);
-
- private List<RecordReaderFileConfig> _recordReaderFileConfigs;
- private List<RecordTransformer> _customRecordTransformers;
private final SegmentProcessorConfig _processorConfig;
private final File _mapperOutputDir;
-
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
-
private final CompositeTransformer _recordTransformer;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
private final String[] _partitionsBuffer;
// NOTE: Use TreeMap so that the order is deterministic
private final Map<String, GenericRowFileManager> _partitionToFileManagerMap
= new TreeMap<>();
+ private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
+ private List<RecordReaderFileConfig> _recordReaderFileConfigs;
+ private List<RecordTransformer> _customRecordTransformers;
public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig
processorConfig, File mapperOutputDir) {
@@ -107,6 +106,10 @@ public class SegmentMapper {
LOGGER.info("Initialized mapper with {} record readers, output dir: {},
timeHandler: {}, partitioners: {}",
_recordReaderFileConfigs.size(), _mapperOutputDir,
_timeHandler.getClass(),
Arrays.stream(_partitioners).map(p ->
p.getClass().toString()).collect(Collectors.joining(",")));
+
+ // initialize adaptive writer.
+ _adaptiveSizeBasedWriter =
+ new
AdaptiveSizeBasedWriter(processorConfig.getSegmentConfig().getIntermediateFileSizeThreshold());
}
/**
@@ -118,7 +121,7 @@ public class SegmentMapper {
try {
return doMap();
} catch (Exception e) {
- // Cleaning up resources created by the mapper, leaving others to the
caller like the input _recordReaders.
+ // Cleaning up resources created by the mapper.
for (GenericRowFileManager fileManager :
_partitionToFileManagerMap.values()) {
fileManager.cleanUp();
}
@@ -129,40 +132,39 @@ public class SegmentMapper {
private Map<String, GenericRowFileManager> doMap()
throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
- int totalCount = _recordReaderFileConfigs.size();
int count = 1;
+ int totalNumRecordReaders = _recordReaderFileConfigs.size();
GenericRow reuse = new GenericRow();
for (RecordReaderFileConfig recordReaderFileConfig :
_recordReaderFileConfigs) {
- RecordReader recordReader = recordReaderFileConfig._recordReader;
- if (recordReader == null) {
- // We create and use the recordReader here.
- try {
- recordReader =
-
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
- recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
- mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
- } finally {
- if (recordReader != null) {
- recordReader.close();
- }
- }
- } else {
- mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+
+ // Mapper can terminate midway of reading a file if the intermediate
file size has crossed the configured
+ // threshold. Map phase will continue in the next iteration right where
we are leaving off in the current
+ // iteration.
+ boolean shouldMapperTerminate =
+ !completeMapAndTransformRow(recordReader, reuse, observer, count,
totalNumRecordReaders);
+
+ // Terminate the map phase if intermediate file size has crossed the
threshold.
+ if (shouldMapperTerminate) {
+ break;
}
+ recordReaderFileConfig.closeRecordReader();
count++;
}
for (GenericRowFileManager fileManager :
_partitionToFileManagerMap.values()) {
fileManager.closeFileWriter();
}
-
return _partitionToFileManagerMap;
}
- private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+
+// Returns true if the map phase can continue, false if it should terminate
based on the configured threshold for
+// intermediate file size during map phase.
+ private boolean completeMapAndTransformRow(RecordReader recordReader,
GenericRow reuse,
Consumer<Object> observer, int count, int totalCount) throws Exception {
observer.accept(String.format("Doing map phase on data from RecordReader
(%d out of %d)", count, totalCount));
- while (recordReader.hasNext()) {
+ while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
// TODO: Add ComplexTypeTransformer here. Currently it is not idempotent
so cannot add it
@@ -183,6 +185,16 @@ public class SegmentMapper {
}
reuse.clear();
}
+ if (recordReader.hasNext() && !_adaptiveSizeBasedWriter.canWrite()) {
+ String logMessage = String.format(
+ "Stopping record readers at index: %d out of %d passed to mapper as
size limit reached, bytes written = %d,"
+ + " bytes " + "limit = %d", count, totalCount,
_adaptiveSizeBasedWriter.getNumBytesWritten(),
+ _adaptiveSizeBasedWriter.getBytesLimit());
+ observer.accept(logMessage);
+ LOGGER.info(logMessage);
+ return false;
+ }
+ return true;
}
private void writeRecord(GenericRow row)
@@ -210,6 +222,10 @@ public class SegmentMapper {
_partitionToFileManagerMap.put(partition, fileManager);
}
- fileManager.getFileWriter().write(row);
+ // Get the file writer.
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+
+ // Write the row.
+ _adaptiveSizeBasedWriter.write(fileWriter, row);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 8b3e150a1b..0ec9261d92 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -56,8 +56,10 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
@@ -581,6 +583,207 @@ public class SegmentProcessorFrameworkTest {
rewindRecordReaders(_multipleSegments);
}
+ @Test
+ public void testConfigurableMapperOutputSize()
+ throws Exception {
+ File workingDir = new File(TEMP_DIR, "configurable_mapper_test_output");
+ FileUtils.forceMkdir(workingDir);
+ int expectedTotalDocsCount = 10;
+
+ // Test 1 : Default case i.e. no limit to mapper output file size (single
record reader).
+
+ SegmentProcessorConfig config =
+ new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework = new
SegmentProcessorFramework(_singleSegment, config, workingDir);
+ List<File> outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ String[] outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1,
Arrays.toString(outputDirs));
+ SegmentMetadata segmentMetadata = new
SegmentMetadataImpl(outputSegments.get(0));
+ assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount);
+ assertEquals(segmentMetadata.getName(),
"myTable_1597719600000_1597892400000_0");
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
+
+ // Test 2 : Default case i.e. no limit to mapper output file size
(multiple record readers).
+ config = new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ framework = new SegmentProcessorFramework(_multipleSegments, config,
workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1,
Arrays.toString(outputDirs));
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+ assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount);
+ assertEquals(segmentMetadata.getName(),
"myTable_1597719600000_1597892400000_0");
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multipleSegments);
+
+ // Test 3 : Test mapper with threshold output size (single record reader).
+
+ // Create a segmentConfig with intermediate mapper output size threshold
set to the number of bytes in each row
+ // from the data. In this way, we can test if each row is written to a
separate segment.
+ SegmentConfig segmentConfig =
+ new
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+ .setSegmentNamePostfix("testPostfix").build();
+ config = new
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+ .setSchema(_schema).build();
+ framework = new SegmentProcessorFramework(_singleSegment, config,
workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), expectedTotalDocsCount);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1,
Arrays.toString(outputDirs));
+
+ // Verify that each segment has only one row, and the segment name is
correct.
+
+ for (int i = 0; i < expectedTotalDocsCount; i++) {
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+ assertEquals(segmentMetadata.getTotalDocs(), 1);
+
assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i));
+ }
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
+
+ // Test 4 : Test mapper with threshold output size (multiple record
readers).
+
+ // Create a segmentConfig with intermediate mapper output size threshold
set to the number of bytes in each row
+ // from the data. In this way, we can test if each row is written to a
separate segment.
+ segmentConfig = new
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+ .setSegmentNamePostfix("testPostfix").build();
+ config = new
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+ .setSchema(_schema).build();
+ framework = new SegmentProcessorFramework(_multipleSegments, config,
workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), expectedTotalDocsCount);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1,
Arrays.toString(outputDirs));
+
+ // Verify that each segment has only one row, and the segment name is
correct.
+
+ for (int i = 0; i < expectedTotalDocsCount; i++) {
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+ assertEquals(segmentMetadata.getTotalDocs(), 1);
+
assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i));
+ }
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multipleSegments);
+
+ // Test 5 : Test with injected failure in mapper to verify output
directory is cleaned up.
+
+ List<RecordReader> testList = new ArrayList<>(_multipleSegments);
+ testList.set(1, null);
+ segmentConfig = new
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+ .setSegmentNamePostfix("testPostfix").build();
+ config = new
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+ .setSchema(_schema).build();
+ SegmentProcessorFramework failureTest = new
SegmentProcessorFramework(testList, config, workingDir);
+ assertThrows(NullPointerException.class, failureTest::process);
+ assertTrue(FileUtils.isEmptyDirectory(workingDir));
+ rewindRecordReaders(_multipleSegments);
+
+ // Test 6: RecordReader should be closed when recordReader is created
inside RecordReaderFileConfig (without mapper
+ // output size threshold configured).
+
+ ClassLoader classLoader = getClass().getClassLoader();
+ URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
+ RecordReaderFileConfig recordReaderFileConfig =
+ new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()),
null, null);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId",
DataType.STRING, "")
+ .addSingleValueDimension("teamName", DataType.STRING, "")
+ .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:MILLISECONDS").build();
+
+ config = new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
+
+ SegmentProcessorFramework frameworkWithRecordReaderFileConfig =
+ new SegmentProcessorFramework(config, workingDir,
ImmutableList.of(recordReaderFileConfig),
+ Collections.emptyList(), null);
+ outputSegments = frameworkWithRecordReaderFileConfig.process();
+
+ // Verify the number of segments created and the total docs.
+ assertEquals(outputSegments.size(), 1);
+ ImmutableSegment segment =
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+ segmentMetadata = segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 52);
+
+ // Verify that the record reader is closed from RecordReaderFileConfig.
+
assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+ FileUtils.cleanDirectory(workingDir);
+
+ // Test 7: RecordReader should not be closed when recordReader is passed
to RecordReaderFileConfig. (Without
+ // mapper output size threshold configured)
+
+ RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+ recordReader.rewind();
+
+ // Pass the recordReader to RecordReaderFileConfig.
+ recordReaderFileConfig = new RecordReaderFileConfig(recordReader);
+ SegmentProcessorFramework frameworkWithDelegateRecordReader =
+ new SegmentProcessorFramework(config, workingDir,
ImmutableList.of(recordReaderFileConfig),
+ Collections.emptyList(), null);
+ outputSegments = frameworkWithDelegateRecordReader.process();
+
+ // Verify the number of segments created and the total docs.
+ assertEquals(outputSegments.size(), 1);
+ segment = ImmutableSegmentLoader.load(outputSegments.get(0),
ReadMode.mmap);
+ segmentMetadata = segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 52);
+
+ // Verify that the record reader is not closed from RecordReaderFileConfig.
+
assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+ FileUtils.cleanDirectory(workingDir);
+
+ // Test 8: RecordReader should be closed when recordReader is created
inside RecordReaderFileConfig (With mapper
+ // output size threshold configured).
+
+ expectedTotalDocsCount = 52;
+ recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new
File(resource.toURI()), null, null);
+
+ segmentConfig = new
SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix")
+ .setSegmentNamePostfix("testPostfix").build();
+ config = new
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(tableConfig)
+ .setSchema(schema).build();
+
+ frameworkWithRecordReaderFileConfig =
+ new SegmentProcessorFramework(config, workingDir,
ImmutableList.of(recordReaderFileConfig),
+ Collections.emptyList(), null);
+ outputSegments = frameworkWithRecordReaderFileConfig.process();
+
+ // Verify that each segment has only one row.
+ for (int i = 0; i < expectedTotalDocsCount; i++) {
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+ assertEquals(segmentMetadata.getTotalDocs(), 1);
+ }
+
+ // Verify that the record reader is closed from RecordReaderFileConfig.
+
assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+ FileUtils.cleanDirectory(workingDir);
+
+ // Test 9: RecordReader should not be closed when recordReader is passed
to RecordReaderFileConfig (With mapper
+ // output size threshold configured).
+
+ recordReader = recordReaderFileConfig.getRecordReader();
+ recordReader.rewind();
+
+ // Pass the recordReader to RecordReaderFileConfig.
+ recordReaderFileConfig = new RecordReaderFileConfig(recordReader);
+ frameworkWithDelegateRecordReader =
+ new SegmentProcessorFramework(config, workingDir,
ImmutableList.of(recordReaderFileConfig),
+ Collections.emptyList(), null);
+ outputSegments = frameworkWithDelegateRecordReader.process();
+
+ // Verify that each segment has only one row.
+ for (int i = 0; i < expectedTotalDocsCount; i++) {
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+ assertEquals(segmentMetadata.getTotalDocs(), 1);
+ }
+
+ // Verify that the record reader is not closed from RecordReaderFileConfig.
+
assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+ FileUtils.cleanDirectory(workingDir);
+ }
+
@Test
public void testMultiValue()
throws Exception {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 26c9e59e39..233dfd7f03 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -147,6 +147,10 @@ public class MergeTaskUtils {
if (maxNumRecordsPerSegment != null) {
segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment));
}
+ String segmentMapperFileSizeThreshold =
taskConfig.get(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES);
+ if (segmentMapperFileSizeThreshold != null) {
+
segmentConfigBuilder.setIntermediateFileSizeThreshold(Long.parseLong(segmentMapperFileSizeThreshold));
+ }
segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
segmentConfigBuilder.setSegmentNamePostfix(taskConfig.get(MergeTask.SEGMENT_NAME_POSTFIX_KEY));
segmentConfigBuilder.setFixedSegmentName(taskConfig.get(MergeTask.FIXED_SEGMENT_NAME_KEY));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
index bc6b897211..731607784b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -178,15 +178,17 @@ public class MergeTaskUtilsTest {
taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
taskConfig.put(MergeTask.SEGMENT_NAME_POSTFIX_KEY, "myPostfix");
taskConfig.put(MergeTask.FIXED_SEGMENT_NAME_KEY, "mySegment");
+ taskConfig.put(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES, "1000000000");
SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
assertEquals(segmentConfig.getFixedSegmentName(), "mySegment");
+ assertEquals(segmentConfig.getIntermediateFileSizeThreshold(),
1000000000L);
assertEquals(segmentConfig.toString(),
- "SegmentConfig{_maxNumRecordsPerSegment=10000,
_segmentNamePrefix='myPrefix', "
- + "_segmentNamePostfix='myPostfix',
_fixedSegmentName='mySegment'}");
+ "SegmentConfig{_maxNumRecordsPerSegment=10000,
_segmentMapperFileSizeThresholdInBytes=1000000000, "
+ + "_segmentNamePrefix='myPrefix', _segmentNamePostfix='myPostfix',
_fixedSegmentName='mySegment'}");
segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
assertEquals(segmentConfig.getMaxNumRecordsPerSegment(),
SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 8a2ebb9e16..51e4ed0cfb 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -34,8 +34,13 @@ public class RecordReaderFileConfig {
public final File _dataFile;
public final Set<String> _fieldsToRead;
public final RecordReaderConfig _recordReaderConfig;
+ private final boolean _isDelegateReader;
// Record Readers created/passed from clients.
- public final RecordReader _recordReader;
+ public RecordReader _recordReader;
+ // Track if RecordReaderFileConfig initialized the RecordReader for to aid
in closing the RecordReader.
+ private boolean _isRecordReaderInitialized;
+ // Track if RecordReaderFileConfig closed the RecordReader for testing
purposes.
+ private boolean _isRecordReaderClosed;
// Pass in the info needed to initialize the reader
public RecordReaderFileConfig(FileFormat fileFormat, File dataFile,
Set<String> fieldsToRead,
@@ -45,6 +50,11 @@ public class RecordReaderFileConfig {
_fieldsToRead = fieldsToRead;
_recordReaderConfig = recordReaderConfig;
_recordReader = null;
+ // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns
the RecordReader, so it should be closed
+ // by RecordReaderFileConfig as well.
+ _isDelegateReader = false;
+ _isRecordReaderInitialized = false;
+ _isRecordReaderClosed = false;
}
// Pass in the reader instance directly
@@ -54,5 +64,44 @@ public class RecordReaderFileConfig {
_dataFile = null;
_fieldsToRead = null;
_recordReaderConfig = null;
+ // This is a delegate RecordReader i.e. RecordReader instance has been
passed to RecordReaderFileConfig instead
+ // of the configs. It means RecordReaderFileConfig does not own the
RecordReader, so it should not be closed by
+ // RecordReaderFileConfig as well. The responsibility of closing the
RecordReader lies with the caller.
+ _isDelegateReader = true;
+ _isRecordReaderInitialized = true;
+ _isRecordReaderClosed = false;
+ }
+
+ // Return the RecordReader instance. Initialize the RecordReader if not
already initialized.
+ public RecordReader getRecordReader()
+ throws Exception {
+ if (!_isRecordReaderInitialized) {
+ _recordReader = RecordReaderFactory.getRecordReader(_fileFormat,
_dataFile, _fieldsToRead, _recordReaderConfig);
+ _isRecordReaderInitialized = true;
+ }
+ return _recordReader;
+ }
+
+ // Close the RecordReader instance if RecordReaderFileConfig initialized it.
+ public void closeRecordReader()
+ throws Exception {
+ // If RecordReaderFileConfig did not create the RecordReader, then it
should not close it.
+ if (_isRecordReaderInitialized && !_isDelegateReader) {
+ _recordReader.close();
+ _isRecordReaderClosed = true;
+ }
+ }
+
+ // Return true if RecordReader is done processing.
+ public boolean isRecordReaderDone() {
+ if (_isRecordReaderInitialized) {
+ return !_recordReader.hasNext();
+ }
+ return false;
+ }
+
+ // For testing purposes only.
+ public boolean isRecordReaderClosedFromRecordReaderFileConfig() {
+ return _isRecordReaderClosed;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]