This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch segment-merge-fix in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 333833a9d6d16ffa4c233009fc56da2647e17d0e Author: Subbu Subramaniam <[email protected]> AuthorDate: Tue Dec 11 20:44:53 2018 -0800 Fix segment merge command. Some old pinot segments were allowed with 0 documents. Considering those documents for start/end time causes minStartTime to become 0, an invalid value if auto-naming of segments is chosen. Also, the multi-reader did not allow for the fact that some segments may be smaller than others, so we need to iterate through all segments before throwing an exception. --- .../readers/MultiplePinotSegmentRecordReader.java | 13 +++++----- .../segment/converter/SegmentMergeCommand.java | 28 ++++++++++++++-------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java index 3b41af1..5a7f142 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java @@ -147,15 +147,14 @@ public class MultiplePinotSegmentRecordReader implements RecordReader { return reuse; } else { // If there is no sorted column specified, simply concatenate the segments - PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId); - if (!currentReader.hasNext()) { - _currentReaderId++; - if (_currentReaderId >= _pinotSegmentRecordReaders.size()) { - throw new RuntimeException("next is called after reading all data"); + for (int i = 0; i < _pinotSegmentRecordReaders.size(); + i++, _currentReaderId = (_currentReaderId + 1) % _pinotSegmentRecordReaders.size()) { + PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId); + if (currentReader.hasNext()) { + return currentReader.next(reuse); } - currentReader = _pinotSegmentRecordReaders.get(_currentReaderId); } - return currentReader.next(reuse); + throw new RuntimeException("next is called after reading all data"); } } diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java index 205231e..0575684 100644 --- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java +++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import com.google.common.base.Preconditions; @@ -155,18 +156,25 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com long minStartTime = Long.MAX_VALUE; long maxEndTime = Long.MIN_VALUE; long totalNumDocsBeforeMerge = 0L; - for (File indexDir : inputIndexDirs) { + Iterator<File> it = inputIndexDirs.iterator(); + while (it.hasNext()) { + File indexDir = it.next(); SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir); - long currentStartTime = segmentMetadata.getStartTime(); - if (currentStartTime < minStartTime) { - minStartTime = currentStartTime; - } - - long currentEndTime = segmentMetadata.getEndTime(); - if (currentEndTime > maxEndTime) { - maxEndTime = currentEndTime; + if (segmentMetadata.getTotalDocs() > 0) { + long currentStartTime = segmentMetadata.getStartTime(); + if (currentStartTime < minStartTime) { + minStartTime = currentStartTime; + } + + long currentEndTime = segmentMetadata.getEndTime(); + if (currentEndTime > maxEndTime) { + maxEndTime = currentEndTime; + } + totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs(); + } else { + LOGGER.info("Discarding segment {} since it has 0 records", segmentMetadata.getName()); + it.remove(); } - totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs(); } // Compute segment name if it is not specified --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
