This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e1e54cf Generate inverted index in purge task if it exists (#4182)
e1e54cf is described below
commit e1e54cf9e6365641d0e08c87af22b3360938c102
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 2 13:45:28 2019 -0700
Generate inverted index in purge task if it exists (#4182)
If purge task does not generate inverted index, then server needs
to generate it while loading the segment, which can potentailly
cause performance issue.
TODO: uniform the behavior of Pinot Hadoop, segment converter and
purger
---
.../apache/pinot/core/minion/SegmentPurger.java | 30 ++++++++---
.../pinot/core/minion/SegmentPurgerTest.java | 61 +++++++++++++---------
2 files changed, 59 insertions(+), 32 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 9581310..3e95b7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.minion;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.segment.StarTreeMetadata;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -34,6 +36,8 @@ import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +69,7 @@ public class SegmentPurger {
public File purgeSegment()
throws Exception {
- SegmentMetadata segmentMetadata = new
SegmentMetadataImpl(_originalIndexDir);
+ SegmentMetadataImpl segmentMetadata = new
SegmentMetadataImpl(_originalIndexDir);
String tableName = segmentMetadata.getTableName();
String segmentName = segmentMetadata.getName();
LOGGER.info("Start purging table: {}, segment: {}", tableName,
segmentName);
@@ -81,7 +85,8 @@ public class SegmentPurger {
return null;
}
- SegmentGeneratorConfig config = new
SegmentGeneratorConfig(purgeRecordReader.getSchema());
+ Schema schema = purgeRecordReader.getSchema();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
config.setOutDir(_workingDir.getPath());
config.setTableName(tableName);
config.setSegmentName(segmentName);
@@ -100,14 +105,28 @@ public class SegmentPurger {
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
+ // Generate inverted index if it exists in the original segment
+ // TODO: once the column metadata correctly reflects whether inverted
index exists for the column, use that
+ // instead of reading the segment
+ // TODO: uniform the behavior of Pinot Hadoop segment generation,
segment converter and purger
+ List<String> invertedIndexCreationColumns = new ArrayList<>();
+ try (SegmentDirectory segmentDirectory = SegmentDirectory
+ .createFromLocalFS(_originalIndexDir, segmentMetadata,
ReadMode.mmap);
+ SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+ for (String column : schema.getColumnNames()) {
+ if (reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX)) {
+ invertedIndexCreationColumns.add(column);
+ }
+ }
+ }
+ config.setInvertedIndexCreationColumns(invertedIndexCreationColumns);
+
// Generate star-tree if it exists in the original segment
StarTreeMetadata starTreeMetadata =
segmentMetadata.getStarTreeMetadata();
if (starTreeMetadata != null) {
config.enableStarTreeIndex(StarTreeIndexSpec.fromStarTreeMetadata(starTreeMetadata));
}
- // TODO: currently we don't generate inverted index
-
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
purgeRecordReader.rewind();
driver.init(config, purgeRecordReader);
@@ -152,7 +171,6 @@ public class SegmentPurger {
@Override
public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
-
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index c8b922b..81e41ac 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -20,23 +20,31 @@ package org.apache.pinot.core.minion;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.testng.Assert;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
public class SegmentPurgerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"SegmentPurgerTest");
@@ -83,6 +91,7 @@ public class SegmentPurgerTest {
config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
+ config.setInvertedIndexCreationColumns(Collections.singletonList(D1));
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(config, genericRowRecordReader);
@@ -94,23 +103,15 @@ public class SegmentPurgerTest {
public void testPurgeSegment()
throws Exception {
// Purge records with d1 = 0
- SegmentPurger.RecordPurger recordPurger = new SegmentPurger.RecordPurger()
{
- @Override
- public boolean shouldPurge(GenericRow row) {
- return row.getValue(D1).equals(0);
- }
- };
-
- // Modify records with d2 = 0 to d2 = 100
- SegmentPurger.RecordModifier recordModifier = new
SegmentPurger.RecordModifier() {
- @Override
- public boolean modifyRecord(GenericRow row) {
- if (row.getValue(D2).equals(0)) {
- row.putField(D2, Integer.MAX_VALUE);
- return true;
- } else {
- return false;
- }
+ SegmentPurger.RecordPurger recordPurger = row ->
row.getValue(D1).equals(0);
+
+ // Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
+ SegmentPurger.RecordModifier recordModifier = row -> {
+ if (row.getValue(D2).equals(0)) {
+ row.putField(D2, Integer.MAX_VALUE);
+ return true;
+ } else {
+ return false;
}
};
@@ -119,14 +120,14 @@ public class SegmentPurgerTest {
File purgedIndexDir = segmentPurger.purgeSegment();
// Check the purge/modify counter in segment purger
- Assert.assertEquals(segmentPurger.getNumRecordsPurged(),
_expectedNumRecordsPurged);
- Assert.assertEquals(segmentPurger.getNumRecordsModified(),
_expectedNumRecordsModified);
+ assertEquals(segmentPurger.getNumRecordsPurged(),
_expectedNumRecordsPurged);
+ assertEquals(segmentPurger.getNumRecordsModified(),
_expectedNumRecordsModified);
// Check crc and index creation time
SegmentMetadataImpl purgedSegmentMetadata = new
SegmentMetadataImpl(purgedIndexDir);
SegmentMetadataImpl originalSegmentMetadata = new
SegmentMetadataImpl(_originalIndexDir);
-
Assert.assertFalse(purgedSegmentMetadata.getCrc().equals(originalSegmentMetadata.getCrc()));
- Assert.assertEquals(purgedSegmentMetadata.getIndexCreationTime(),
originalSegmentMetadata.getIndexCreationTime());
+ assertNotEquals(purgedSegmentMetadata.getCrc(),
originalSegmentMetadata.getCrc());
+ assertEquals(purgedSegmentMetadata.getIndexCreationTime(),
originalSegmentMetadata.getIndexCreationTime());
try (PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader(purgedIndexDir)) {
int numRecordsRemaining = 0;
@@ -137,8 +138,8 @@ public class SegmentPurgerTest {
row = pinotSegmentRecordReader.next(row);
// Purged segment should not have any record with d1 = 0 or d2 = 0
- Assert.assertFalse(row.getValue(D1).equals(0));
- Assert.assertFalse(row.getValue(D2).equals(0));
+ assertNotEquals(row.getValue(D1), 0);
+ assertNotEquals(row.getValue(D2), 0);
numRecordsRemaining++;
if (row.getValue(D2).equals(Integer.MAX_VALUE)) {
@@ -146,8 +147,16 @@ public class SegmentPurgerTest {
}
}
- Assert.assertEquals(numRecordsRemaining, NUM_ROWS -
_expectedNumRecordsPurged);
- Assert.assertEquals(numRecordsModified, _expectedNumRecordsModified);
+ assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
+ assertEquals(numRecordsModified, _expectedNumRecordsModified);
+ }
+
+ // Check inverted index
+ try (SegmentDirectory segmentDirectory = SegmentDirectory
+ .createFromLocalFS(purgedIndexDir, purgedSegmentMetadata,
ReadMode.mmap);
+ SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+ assertTrue(reader.hasIndexFor(D1, ColumnIndexType.INVERTED_INDEX));
+ assertFalse(reader.hasIndexFor(D2, ColumnIndexType.INVERTED_INDEX));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]