This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d48e4fd  Fix segment merge command. (#3611)
d48e4fd is described below

commit d48e4fd21ecd0197f48a65aaf19192c52df015fc
Author: Subbu Subramaniam <[email protected]>
AuthorDate: Wed Dec 12 09:02:55 2018 -0800

    Fix segment merge command. (#3611)
    
    Some old pinot segments were allowed with 0 documents. Considering those 
documents for start/end time
    causes minStartTime to become 0, an invalid value if auto-naming of 
segments is chosen.
    
    Also, the multi-reader did not allow for the fact that some segments may be 
smaller than others, so
    we need to iterate through all segments before throwing an exception.
---
 .../readers/MultiplePinotSegmentRecordReader.java  | 13 +++++-----
 .../segment/converter/SegmentMergeCommand.java     | 28 ++++++++++++++--------
 2 files changed, 24 insertions(+), 17 deletions(-)

diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 3b41af1..5a7f142 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -147,15 +147,14 @@ public class MultiplePinotSegmentRecordReader implements 
RecordReader {
       return reuse;
     } else {
       // If there is no sorted column specified, simply concatenate the 
segments
-      PinotSegmentRecordReader currentReader = 
_pinotSegmentRecordReaders.get(_currentReaderId);
-      if (!currentReader.hasNext()) {
-        _currentReaderId++;
-        if (_currentReaderId >= _pinotSegmentRecordReaders.size()) {
-          throw new RuntimeException("next is called after reading all data");
+      for (int i = 0; i < _pinotSegmentRecordReaders.size();
+          i++, _currentReaderId = (_currentReaderId + 1) % 
_pinotSegmentRecordReaders.size()) {
+        PinotSegmentRecordReader currentReader = 
_pinotSegmentRecordReaders.get(_currentReaderId);
+        if (currentReader.hasNext()) {
+          return currentReader.next(reuse);
         }
-        currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
       }
-      return currentReader.next(reuse);
+      throw new RuntimeException("next is called after reading all data");
     }
   }
 
diff --git 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
index 205231e..0575684 100644
--- 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -155,18 +156,25 @@ public class SegmentMergeCommand extends 
AbstractBaseAdminCommand implements Com
       long minStartTime = Long.MAX_VALUE;
       long maxEndTime = Long.MIN_VALUE;
       long totalNumDocsBeforeMerge = 0L;
-      for (File indexDir : inputIndexDirs) {
+      Iterator<File> it = inputIndexDirs.iterator();
+      while (it.hasNext()) {
+        File indexDir = it.next();
         SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
-        long currentStartTime = segmentMetadata.getStartTime();
-        if (currentStartTime < minStartTime) {
-          minStartTime = currentStartTime;
-        }
-
-        long currentEndTime = segmentMetadata.getEndTime();
-        if (currentEndTime > maxEndTime) {
-          maxEndTime = currentEndTime;
+        if (segmentMetadata.getTotalDocs() > 0) {
+          long currentStartTime = segmentMetadata.getStartTime();
+          if (currentStartTime < minStartTime) {
+            minStartTime = currentStartTime;
+          }
+
+          long currentEndTime = segmentMetadata.getEndTime();
+          if (currentEndTime > maxEndTime) {
+            maxEndTime = currentEndTime;
+          }
+          totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
+        } else {
+          LOGGER.info("Discarding segment {} since it has 0 records", 
segmentMetadata.getName());
+          it.remove();
         }
-        totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
       }
 
       // Compute segment name if it is not specified


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to