This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d48e4fd Fix segment merge command. (#3611)
d48e4fd is described below
commit d48e4fd21ecd0197f48a65aaf19192c52df015fc
Author: Subbu Subramaniam <[email protected]>
AuthorDate: Wed Dec 12 09:02:55 2018 -0800
Fix segment merge command. (#3611)
Some old pinot segments were allowed with 0 documents. Considering those
documents for start/end time
causes minStartTime to become 0, an invalid value if auto-naming of
segments is chosen.
Also, the multi-reader did not allow for the fact that some segments may be
smaller than others, so
we need to iterate through all segments before throwing an exception.
---
.../readers/MultiplePinotSegmentRecordReader.java | 13 +++++-----
.../segment/converter/SegmentMergeCommand.java | 28 ++++++++++++++--------
2 files changed, 24 insertions(+), 17 deletions(-)
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 3b41af1..5a7f142 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -147,15 +147,14 @@ public class MultiplePinotSegmentRecordReader implements
RecordReader {
return reuse;
} else {
// If there is no sorted column specified, simply concatenate the
segments
- PinotSegmentRecordReader currentReader =
_pinotSegmentRecordReaders.get(_currentReaderId);
- if (!currentReader.hasNext()) {
- _currentReaderId++;
- if (_currentReaderId >= _pinotSegmentRecordReaders.size()) {
- throw new RuntimeException("next is called after reading all data");
+ for (int i = 0; i < _pinotSegmentRecordReaders.size();
+ i++, _currentReaderId = (_currentReaderId + 1) %
_pinotSegmentRecordReaders.size()) {
+ PinotSegmentRecordReader currentReader =
_pinotSegmentRecordReaders.get(_currentReaderId);
+ if (currentReader.hasNext()) {
+ return currentReader.next(reuse);
}
- currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
}
- return currentReader.next(reuse);
+ throw new RuntimeException("next is called after reading all data");
}
}
diff --git
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
index 205231e..0575684 100644
---
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
+++
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -155,18 +156,25 @@ public class SegmentMergeCommand extends
AbstractBaseAdminCommand implements Com
long minStartTime = Long.MAX_VALUE;
long maxEndTime = Long.MIN_VALUE;
long totalNumDocsBeforeMerge = 0L;
- for (File indexDir : inputIndexDirs) {
+ Iterator<File> it = inputIndexDirs.iterator();
+ while (it.hasNext()) {
+ File indexDir = it.next();
SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
- long currentStartTime = segmentMetadata.getStartTime();
- if (currentStartTime < minStartTime) {
- minStartTime = currentStartTime;
- }
-
- long currentEndTime = segmentMetadata.getEndTime();
- if (currentEndTime > maxEndTime) {
- maxEndTime = currentEndTime;
+ if (segmentMetadata.getTotalDocs() > 0) {
+ long currentStartTime = segmentMetadata.getStartTime();
+ if (currentStartTime < minStartTime) {
+ minStartTime = currentStartTime;
+ }
+
+ long currentEndTime = segmentMetadata.getEndTime();
+ if (currentEndTime > maxEndTime) {
+ maxEndTime = currentEndTime;
+ }
+ totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
+ } else {
+ LOGGER.info("Discarding segment {} since it has 0 records",
segmentMetadata.getName());
+ it.remove();
}
- totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
}
// Compute segment name if it is not specified
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]