This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6761a982 Fix "rewind()" for CompactedPinotSegmentRecordReader 
(#12329)
ed6761a982 is described below

commit ed6761a982d4666c68b8fcda598a84571446c2ea
Author: Seunghyun Lee <[email protected]>
AuthorDate: Sat Jan 27 21:17:12 2024 -0800

    Fix "rewind()" for CompactedPinotSegmentRecordReader (#12329)
    
    * Fix "rewind()" for CompactedPinotSegmentRecordReader
    
    - The original implementation for CompactedPinotSegmentRecordReader
      had an issue with rewind() because we did not re-initialize the
      validDocId iterator. This PR fixes the issue.
    - Added the unit test for checking rewind()
    
    * Addressing the comments
---
 .../readers/CompactedPinotSegmentRecordReader.java |  11 +-
 .../CompactedPinotSegmentRecordReaderTest.java     | 131 +++++++++++++++++++++
 2 files changed, 139 insertions(+), 3 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
index 2795982ab6..07f82d837a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
@@ -35,15 +35,19 @@ import org.roaringbitmap.RoaringBitmap;
  */
 public class CompactedPinotSegmentRecordReader implements RecordReader {
   private final PinotSegmentRecordReader _pinotSegmentRecordReader;
-  private final PeekableIntIterator _validDocIdsIterator;
+  private final RoaringBitmap _validDocIdsBitmap;
+
+  // Valid doc ids iterator
+  private PeekableIntIterator _validDocIdsIterator;
   // Reusable generic row to store the next row to return
-  GenericRow _nextRow = new GenericRow();
+  private GenericRow _nextRow = new GenericRow();
   // Flag to mark whether we need to fetch another row
-  boolean _nextRowReturned = true;
+  private boolean _nextRowReturned = true;
 
   public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap 
validDocIds) {
     _pinotSegmentRecordReader = new PinotSegmentRecordReader();
     _pinotSegmentRecordReader.init(indexDir, null, null);
+    _validDocIdsBitmap = validDocIds;
     _validDocIdsIterator = validDocIds.getIntIterator();
   }
 
@@ -96,6 +100,7 @@ public class CompactedPinotSegmentRecordReader implements 
RecordReader {
       throws IOException {
     _pinotSegmentRecordReader.rewind();
     _nextRowReturned = true;
+    _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
new file mode 100644
index 0000000000..b9afc3ce88
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CompactedPinotSegmentRecordReaderTest {
+  private static final int NUM_ROWS = 10000;
+  private static final String D_SV_1 = "d_sv_1";
+  private static final String D_MV_1 = "d_mv_1";
+  private static final String M1 = "m1";
+  private static final String M2 = "m2";
+  private static final String TIME = "t";
+
+  private String _segmentOutputDir;
+  private File _segmentIndexDir;
+  private List<GenericRow> _rows;
+  private RecordReader _recordReader;
+
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    Schema schema = createPinotSchema();
+    TableConfig tableConfig = createTableConfig();
+    String segmentName = "compactedPinotSegmentRecordReaderTest";
+    _segmentOutputDir = Files.createTempDir().toString();
+    _rows = PinotSegmentUtil.createTestData(schema, NUM_ROWS);
+    _recordReader = new GenericRowRecordReader(_rows);
+    _segmentIndexDir =
+        PinotSegmentUtil.createSegment(tableConfig, schema, segmentName, 
_segmentOutputDir, _recordReader);
+  }
+
+  private Schema createPinotSchema() {
+    return new 
Schema.SchemaBuilder().setSchemaName("schema").addSingleValueDimension(D_SV_1, 
FieldSpec.DataType.STRING)
+        .addMultiValueDimension(D_MV_1, 
FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT)
+        .addMetric(M2, FieldSpec.DataType.FLOAT)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.HOURS, TIME), null).build();
+  }
+
+  private TableConfig createTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME).build();
+  }
+
+  @Test
+  public void testCompactedPinotSegmentRecordReader()
+      throws Exception {
+
+    RoaringBitmap validDocIds = new RoaringBitmap();
+    for (int i = 0; i < NUM_ROWS; i += 2) {
+      validDocIds.add(i);
+    }
+    List<GenericRow> outputRows = new ArrayList<>();
+    List<GenericRow> rewoundOuputRows = new ArrayList<>();
+
+    CompactedPinotSegmentRecordReader compactedReader =
+        new CompactedPinotSegmentRecordReader(_segmentIndexDir, validDocIds);
+    while (compactedReader.hasNext()) {
+      outputRows.add(compactedReader.next());
+    }
+    compactedReader.rewind();
+
+    while (compactedReader.hasNext()) {
+      rewoundOuputRows.add(compactedReader.next());
+    }
+    compactedReader.close();
+
+    Assert.assertEquals(outputRows.size(), NUM_ROWS / 2,
+        "Number of _rows returned by CompactedPinotSegmentRecordReader is 
incorrect");
+    for (int i = 0; i < outputRows.size(); i++) {
+      GenericRow outputRow = outputRows.get(i);
+      GenericRow row = _rows.get(i * 2);
+      Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+      
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
 row.getValue(D_MV_1)));
+      Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+      Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+      Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+    }
+
+    Assert.assertEquals(rewoundOuputRows.size(), NUM_ROWS / 2,
+        "Number of _rows returned by CompactedPinotSegmentRecordReader is 
incorrect");
+    for (int i = 0; i < rewoundOuputRows.size(); i++) {
+      GenericRow outputRow = rewoundOuputRows.get(i);
+      GenericRow row = _rows.get(i * 2);
+      Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+      
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
 row.getValue(D_MV_1)));
+      Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+      Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+      Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+    }
+  }
+
+  @AfterClass
+  public void cleanup() {
+    FileUtils.deleteQuietly(new File(_segmentOutputDir));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to