This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ed6761a982 Fix "rewind()" for CompactedPinotSegmentRecordReader
(#12329)
ed6761a982 is described below
commit ed6761a982d4666c68b8fcda598a84571446c2ea
Author: Seunghyun Lee <[email protected]>
AuthorDate: Sat Jan 27 21:17:12 2024 -0800
Fix "rewind()" for CompactedPinotSegmentRecordReader (#12329)
* Fix "rewind()" for CompactedPinotSegmentRecordReader
- The original implementation for CompactedPinotSegmentRecordReader
had an issue with rewind() because we did not re-initialize the
validDocId iterator. This PR fixes the issue.
- Added the unit test for checking rewind()
* Addressing the comments
---
.../readers/CompactedPinotSegmentRecordReader.java | 11 +-
.../CompactedPinotSegmentRecordReaderTest.java | 131 +++++++++++++++++++++
2 files changed, 139 insertions(+), 3 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
index 2795982ab6..07f82d837a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
@@ -35,15 +35,19 @@ import org.roaringbitmap.RoaringBitmap;
*/
public class CompactedPinotSegmentRecordReader implements RecordReader {
private final PinotSegmentRecordReader _pinotSegmentRecordReader;
- private final PeekableIntIterator _validDocIdsIterator;
+ private final RoaringBitmap _validDocIdsBitmap;
+
+ // Valid doc ids iterator
+ private PeekableIntIterator _validDocIdsIterator;
// Reusable generic row to store the next row to return
- GenericRow _nextRow = new GenericRow();
+ private GenericRow _nextRow = new GenericRow();
// Flag to mark whether we need to fetch another row
- boolean _nextRowReturned = true;
+ private boolean _nextRowReturned = true;
public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap
validDocIds) {
_pinotSegmentRecordReader = new PinotSegmentRecordReader();
_pinotSegmentRecordReader.init(indexDir, null, null);
+ _validDocIdsBitmap = validDocIds;
_validDocIdsIterator = validDocIds.getIntIterator();
}
@@ -96,6 +100,7 @@ public class CompactedPinotSegmentRecordReader implements
RecordReader {
throws IOException {
_pinotSegmentRecordReader.rewind();
_nextRowReturned = true;
+ _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
}
@Override
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
new file mode 100644
index 0000000000..b9afc3ce88
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CompactedPinotSegmentRecordReaderTest {
+ private static final int NUM_ROWS = 10000;
+ private static final String D_SV_1 = "d_sv_1";
+ private static final String D_MV_1 = "d_mv_1";
+ private static final String M1 = "m1";
+ private static final String M2 = "m2";
+ private static final String TIME = "t";
+
+ private String _segmentOutputDir;
+ private File _segmentIndexDir;
+ private List<GenericRow> _rows;
+ private RecordReader _recordReader;
+
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ Schema schema = createPinotSchema();
+ TableConfig tableConfig = createTableConfig();
+ String segmentName = "compactedPinotSegmentRecordReaderTest";
+ _segmentOutputDir = Files.createTempDir().toString();
+ _rows = PinotSegmentUtil.createTestData(schema, NUM_ROWS);
+ _recordReader = new GenericRowRecordReader(_rows);
+ _segmentIndexDir =
+ PinotSegmentUtil.createSegment(tableConfig, schema, segmentName,
_segmentOutputDir, _recordReader);
+ }
+
+ private Schema createPinotSchema() {
+ return new
Schema.SchemaBuilder().setSchemaName("schema").addSingleValueDimension(D_SV_1,
FieldSpec.DataType.STRING)
+ .addMultiValueDimension(D_MV_1,
FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT)
+ .addMetric(M2, FieldSpec.DataType.FLOAT)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.HOURS, TIME), null).build();
+ }
+
+ private TableConfig createTableConfig() {
+ return new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME).build();
+ }
+
+ @Test
+ public void testCompactedPinotSegmentRecordReader()
+ throws Exception {
+
+ RoaringBitmap validDocIds = new RoaringBitmap();
+ for (int i = 0; i < NUM_ROWS; i += 2) {
+ validDocIds.add(i);
+ }
+ List<GenericRow> outputRows = new ArrayList<>();
+ List<GenericRow> rewoundOuputRows = new ArrayList<>();
+
+ CompactedPinotSegmentRecordReader compactedReader =
+ new CompactedPinotSegmentRecordReader(_segmentIndexDir, validDocIds);
+ while (compactedReader.hasNext()) {
+ outputRows.add(compactedReader.next());
+ }
+ compactedReader.rewind();
+
+ while (compactedReader.hasNext()) {
+ rewoundOuputRows.add(compactedReader.next());
+ }
+ compactedReader.close();
+
+ Assert.assertEquals(outputRows.size(), NUM_ROWS / 2,
+ "Number of _rows returned by CompactedPinotSegmentRecordReader is
incorrect");
+ for (int i = 0; i < outputRows.size(); i++) {
+ GenericRow outputRow = outputRows.get(i);
+ GenericRow row = _rows.get(i * 2);
+ Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
row.getValue(D_MV_1)));
+ Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+ Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+ Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+ }
+
+ Assert.assertEquals(rewoundOuputRows.size(), NUM_ROWS / 2,
+ "Number of _rows returned by CompactedPinotSegmentRecordReader is
incorrect");
+ for (int i = 0; i < rewoundOuputRows.size(); i++) {
+ GenericRow outputRow = rewoundOuputRows.get(i);
+ GenericRow row = _rows.get(i * 2);
+ Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
row.getValue(D_MV_1)));
+ Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+ Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+ Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+ }
+ }
+
+ @AfterClass
+ public void cleanup() {
+ FileUtils.deleteQuietly(new File(_segmentOutputDir));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]