This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe8d39506b bugfix: use consumerDir during lucene realtime segment
conversion (#13094)
fe8d39506b is described below
commit fe8d39506bdb15f67a4f2d3977ced418d3b8b8b7
Author: Christopher Peck <[email protected]>
AuthorDate: Fri May 10 22:42:07 2024 -0700
bugfix: use consumerDir during lucene realtime segment conversion (#13094)
---
.../realtime/provisioning/MemoryEstimator.java | 6 ++++--
.../realtime/RealtimeSegmentDataManagerTest.java | 1 +
.../indexsegment/mutable/MutableSegmentImpl.java | 9 +++++++-
.../RealtimeSegmentSegmentCreationDataSource.java | 8 +++++++
.../local/realtime/impl/RealtimeSegmentConfig.java | 2 --
.../invertedindex/RealtimeLuceneTextIndex.java | 9 ++++----
.../creator/impl/SegmentColumnarIndexCreator.java | 1 +
.../impl/SegmentIndexCreationDriverImpl.java | 1 +
.../creator/impl/text/LuceneTextIndexCreator.java | 25 +++++++++++++---------
.../index/readers/text/LuceneTextIndexReader.java | 1 +
.../local/segment/index/text/TextIndexType.java | 3 ---
.../mutable/MutableSegmentImplTestUtils.java | 7 +++++-
.../converter/RealtimeSegmentConverterTest.java | 2 +-
.../invertedindex/LuceneMutableTextIndexTest.java | 2 +-
.../segment/store/FilePerIndexDirectoryTest.java | 12 +++++++----
.../store/SingleFileIndexDirectoryTest.java | 12 +++++++----
.../apache/pinot/segment/spi/MutableSegment.java | 6 ++++++
.../segment/spi/creator/IndexCreationContext.java | 22 +++++++++++++++++--
.../spi/creator/SegmentGeneratorConfig.java | 10 +++++++++
.../mutable/provider/MutableIndexContext.java | 3 ---
20 files changed, 104 insertions(+), 38 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index 934b197560..a4b3db45a9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -163,7 +163,8 @@ public class MemoryEstimator {
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
.setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs())
.setAvgNumMultiValues(_avgMultiValues).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
-
.setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory);
+
.setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory)
+ .setConsumerDir(_workingDir.getAbsolutePath());
// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
@@ -326,7 +327,8 @@ public class MemoryEstimator {
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
.setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
.setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
- .setMemoryManager(memoryManager).setStatsHistory(statsHistory);
+ .setMemoryManager(memoryManager).setStatsHistory(statsHistory)
+ .setConsumerDir(_workingDir.getAbsolutePath());
// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index cc93fcfe77..088e270a32 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -106,6 +106,7 @@ public class RealtimeSegmentDataManagerTest {
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
when(tableDataManager.getStatsHistory()).thenReturn(statsHistory);
+
when(tableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath() +
"/consumerDir");
return tableDataManager;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index b336b30f20..38bd1921c6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -140,6 +140,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final PartitionFunction _partitionFunction;
private final int _mainPartitionId; // partition id designated for this
consuming segment
private final boolean _nullHandlingEnabled;
+ private final File _consumerDir;
private final Map<String, IndexContainer> _indexContainerMap = new
HashMap<>();
@@ -216,6 +217,7 @@ public class MutableSegmentImpl implements MutableSegment {
_partitionFunction = config.getPartitionFunction();
_mainPartitionId = config.getPartitionId();
_nullHandlingEnabled = config.isNullHandlingEnabled();
+ _consumerDir = new File(config.getConsumerDir());
Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
@@ -283,7 +285,7 @@ public class MutableSegmentImpl implements MutableSegment {
.withEstimatedCardinality(_statsHistory.getEstimatedCardinality(column))
.withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column))
.withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column))
- .withConsumerDir(config.getConsumerDir() != null ? new
File(config.getConsumerDir()) : null)
+ .withConsumerDir(_consumerDir)
.withFixedLengthBytes(fixedByteSize).build();
// Partition info
@@ -852,6 +854,11 @@ public class MutableSegmentImpl implements MutableSegment {
return _numDocsIndexed;
}
+ @Override
+ public File getConsumerDir() {
+ return _consumerDir;
+ }
+
@Override
public String getSegmentName() {
return _segmentName;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
index e795dd05a6..a8c521ddbc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.converter.stats;
+import java.io.File;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource;
@@ -48,4 +49,11 @@ public class RealtimeSegmentSegmentCreationDataSource
implements SegmentCreation
public RecordReader getRecordReader() {
return _recordReader;
}
+
+ /**
+ * Returns the consumer directory of the realtime segment
+ */
+ public File getConsumerDir() {
+ return _mutableSegment.getConsumerDir();
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index a95a68893c..956135f7ca 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -179,7 +178,6 @@ public class RealtimeSegmentConfig {
return _nullHandlingEnabled;
}
- @Nullable
public String getConsumerDir() {
return _consumerDir;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 8d2e43c8a5..751bff517a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -78,7 +78,7 @@ public class RealtimeLuceneTextIndex implements
MutableTextIndex {
// for realtime
_indexCreator =
new LuceneTextIndexCreator(column, new
File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
- false /* commitOnClose */, true, null, config);
+ false /* commitOnClose */, false, null, null, config);
IndexWriter indexWriter = _indexCreator.getIndexWriter();
_searcherManager = new SearcherManager(indexWriter, false, false, null);
_analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
@@ -151,9 +151,9 @@ public class RealtimeLuceneTextIndex implements
MutableTextIndex {
return searchFuture.get();
} catch (InterruptedException e) {
docIDCollector.markShouldCancel();
- LOGGER.warn("TEXT_MATCH query timeout on realtime consuming segment {},
column {}, search query {}", _segmentName,
- _column, searchQuery);
- throw new RuntimeException("TEXT_MATCH query timeout on realtime
consuming segment");
+ LOGGER.warn("TEXT_MATCH query interrupted while querying the consuming
segment {}, column {}, search query {}",
+ _segmentName, _column, searchQuery);
+ throw new RuntimeException("TEXT_MATCH query interrupted while querying
the consuming segment");
} catch (Exception e) {
LOGGER.error("Failed while searching the realtime text index for segment
{}, column {}, search query {},"
+ " exception {}", _segmentName, _column, searchQuery,
e.getMessage());
@@ -198,6 +198,7 @@ public class RealtimeLuceneTextIndex implements
MutableTextIndex {
_searcherManager.close();
_searcherManager = null;
_indexCreator.close();
+ _analyzer.close();
} catch (Exception e) {
LOGGER.error("Failed while closing the realtime text index for column
{}, exception {}", _column, e.getMessage());
throw new RuntimeException(e);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 1483191b7f..8406226d85 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -163,6 +163,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
.withTextCommitOnClose(true)
.withImmutableToMutableIdMap(immutableToMutableIdMap)
.withRealtimeConversion(segmentCreationSpec.isRealtimeConversion())
+ .withConsumerDir(segmentCreationSpec.getConsumerDir())
.build();
//@formatter:on
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ecfea58ca7..f75afb5b15 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -195,6 +195,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
// Optimization for realtime segment conversion
if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) {
_config.setRealtimeConversion(true);
+ _config.setConsumerDir(((RealtimeSegmentSegmentCreationDataSource)
dataSource).getConsumerDir());
}
// Initialize stats collection
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index c24778ab37..1e9581980d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -89,6 +89,8 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
* @param segmentIndexDir segment index directory
* @param commit true if the index should be committed (at the end after all
documents have
* been added), false if index should not be committed
+ * @param realtimeConversion index creator should create an index using the
realtime segment
+ * @param consumerDir consumer dir containing the realtime index, used when
realtimeConversion and commit is true
* @param immutableToMutableIdMap immutableToMutableIdMap from segment
conversion
* Note on commit:
* Once {@link SegmentColumnarIndexCreator}
@@ -106,7 +108,7 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
* @param config the text index config
*/
public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean
commit, boolean realtimeConversion,
- @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) {
+ @Nullable File consumerDir, @Nullable int[] immutableToMutableIdMap,
TextIndexConfig config) {
_textColumn = column;
_commitOnClose = commit;
@@ -144,7 +146,7 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
if (_reuseMutableIndex) {
LOGGER.info("Reusing the realtime lucene index for segment {} and
column {}", segmentIndexDir, column);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
- convertMutableSegment(segmentIndexDir, immutableToMutableIdMap,
indexWriterConfig);
+ convertMutableSegment(segmentIndexDir, consumerDir,
immutableToMutableIdMap, indexWriterConfig);
return;
}
@@ -161,7 +163,7 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
public LuceneTextIndexCreator(IndexCreationContext context, TextIndexConfig
indexConfig) {
this(context.getFieldSpec().getName(), context.getIndexDir(),
context.isTextCommitOnClose(),
- context.isRealtimeConversion(), context.getImmutableToMutableIdMap(),
indexConfig);
+ context.isRealtimeConversion(), context.getConsumerDir(),
context.getImmutableToMutableIdMap(), indexConfig);
}
public IndexWriter getIndexWriter() {
@@ -174,12 +176,12 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
* @param immutableToMutableIdMap immutableToMutableIdMap from segment
conversion
* @param indexWriterConfig indexWriterConfig
*/
- private void convertMutableSegment(File segmentIndexDir, @Nullable int[]
immutableToMutableIdMap,
+ private void convertMutableSegment(File segmentIndexDir, File consumerDir,
@Nullable int[] immutableToMutableIdMap,
IndexWriterConfig indexWriterConfig) {
try {
// Copy the mutable index to the v1 index location
File dest = getV1TextIndexFile(segmentIndexDir);
- File mutableDir = getMutableIndexDir(segmentIndexDir);
+ File mutableDir = getMutableIndexDir(segmentIndexDir, consumerDir);
FileUtils.copyDirectory(mutableDir, dest);
// Remove the copied write.lock file
@@ -344,12 +346,15 @@ public class LuceneTextIndexCreator extends
AbstractTextIndexCreator {
return new File(indexDir, luceneIndexDirectory);
}
- private File getMutableIndexDir(File indexDir) {
+ private File getMutableIndexDir(File indexDir, File consumerDir) {
+ String segmentName = getSegmentName(indexDir);
+ return new File(new File(consumerDir, segmentName),
+ _textColumn +
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
+ }
+
+ private String getSegmentName(File indexDir) {
// tmpSegmentName format: tmp-tableName__9__1__20240227T0254Z-1709002522086
String tmpSegmentName = indexDir.getParentFile().getName();
- String segmentName =
tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4,
tmpSegmentName.lastIndexOf('-'));
- String mutableDir = indexDir.getParentFile().getParentFile().getParent() +
"/consumers/" + segmentName + "/"
- + _textColumn +
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION;
- return new File(mutableDir);
+ return tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4,
tmpSegmentName.lastIndexOf('-'));
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
index 07eb52f88b..ed027903b9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
@@ -188,6 +188,7 @@ public class LuceneTextIndexReader implements
TextIndexReader {
_indexReader.close();
_indexDirectory.close();
_docIdTranslator.close();
+ _analyzer.close();
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
index cfbf6271f1..7a936aac76 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
@@ -194,9 +194,6 @@ public class TextIndexType extends
AbstractIndexType<TextIndexConfig, TextIndexR
if (config.getFstType() == FSTType.NATIVE) {
return new NativeMutableTextIndex(context.getFieldSpec().getName());
}
- if (context.getConsumerDir() == null) {
- throw new IllegalArgumentException("A consumer directory is required");
- }
return new RealtimeLuceneTextIndex(context.getFieldSpec().getName(),
context.getConsumerDir(),
context.getSegmentName(), config);
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index ade22fcad6..6a8ca27480 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.segment.local.indexsegment.mutable;
+import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -49,6 +52,7 @@ public class MutableSegmentImplTestUtils {
private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
private static final String SEGMENT_NAME = "testSegment__0__0__155555";
private static final String STREAM_NAME = "testStream";
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"MutableSegmentImplTestUtils");
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
@@ -118,7 +122,8 @@ public class MutableSegmentImplTestUtils {
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs)
.setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
- .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn);
+ .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn)
+ .setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() +
"/consumerDir");
for (Map.Entry<String, JsonIndexConfig> entry :
jsonIndexConfigs.entrySet()) {
segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(),
entry.getValue());
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index e4ed4bb396..033acc7dbf 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -484,7 +484,7 @@ public class RealtimeSegmentConverterTest {
.setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
.setOffHeap(true).setMemoryManager(new
DirectMemoryManager(segmentName))
.setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new
File(tmpDir, "stats")))
- .setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath());
+ .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
// create mutable segment impl
RealtimeLuceneTextIndexSearcherPool.init(1);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index b180fd0c4e..5d1a2440a5 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -96,7 +96,7 @@ public class LuceneMutableTextIndexTest {
}
@Test(expectedExceptions = ExecutionException.class,
- expectedExceptionsMessageRegExp = ".*TEXT_MATCH query timeout on
realtime consuming segment.*")
+ expectedExceptionsMessageRegExp = ".*TEXT_MATCH query interrupted while
querying the consuming segment.*")
public void testQueryCancellationIsSuccessful()
throws InterruptedException, ExecutionException {
// Avoid early finalization by not using Executors.newSingleThreadExecutor
(java <= 20, JDK-8145304)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index a385a60b03..97e12bc321 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -204,8 +204,10 @@ public class FilePerIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, null,
+ config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, null,
+ config)) {
PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 1);
@@ -267,8 +269,10 @@ public class FilePerIndexDirectoryTest {
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
// Write sth to buffers and flush them to index files on disk
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, null,
+ config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, null,
+ config)) {
PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 111);
buf = fpi.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 3a94ceec11..b962550e3b 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -237,8 +237,10 @@ public class SingleFileIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, null,
+ config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, null,
+ config)) {
PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 1);
@@ -343,8 +345,10 @@ public class SingleFileIndexDirectoryTest {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null,
true, 500, null, false);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR,
_segmentMetadata, ReadMode.mmap);
- LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, config);
- LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, config)) {
+ LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo",
TEMP_DIR, true, false, null, null,
+ config);
+ LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar",
TEMP_DIR, true, false, null, null,
+ config)) {
PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(),
1024);
buf.putInt(0, 111);
buf = sfd.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
index b675d866e5..7c4d0e1729 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.spi;
+import java.io.File;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -42,4 +43,9 @@ public interface MutableSegment extends IndexSegment {
* @return The number of records indexed
*/
int getNumDocsIndexed();
+
+ /**
+ * Returns the consumer dir containing any segment files.
+ */
+ File getConsumerDir();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 3ebe041e87..5d3a1a78ba 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -97,6 +97,11 @@ public interface IndexCreationContext {
*/
boolean isRealtimeConversion();
+ /**
+ * Used in conjunction with isRealtimeConversion, this returns the location
of the consumer directory used
+ */
+ File getConsumerDir();
+
/**
* This contains immutableToMutableIdMap mapping generated in {@link
SegmentIndexCreationDriver}
*
@@ -127,6 +132,7 @@ public interface IndexCreationContext {
private boolean _fixedLength;
private boolean _textCommitOnClose;
private boolean _realtimeConversion = false;
+ private File _consumerDir;
private int[] _immutableToMutableIdMap;
public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo
columnIndexCreationInfo) {
@@ -250,6 +256,11 @@ public interface IndexCreationContext {
return this;
}
+ public Builder withConsumerDir(File consumerDir) {
+ _consumerDir = consumerDir;
+ return this;
+ }
+
public Builder withImmutableToMutableIdMap(int[] immutableToMutableIdMap) {
_immutableToMutableIdMap = immutableToMutableIdMap;
return this;
@@ -260,7 +271,7 @@ public interface IndexCreationContext {
_maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec),
_sorted, _cardinality,
_totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue,
_maxValue, _forwardIndexDisabled,
_sortedUniqueElementsArray, _optimizedDictionary, _fixedLength,
_textCommitOnClose, _columnStatistics,
- _realtimeConversion, _immutableToMutableIdMap);
+ _realtimeConversion, _consumerDir, _immutableToMutableIdMap);
}
public Builder withSortedUniqueElementsArray(Object
sortedUniqueElementsArray) {
@@ -295,6 +306,7 @@ public interface IndexCreationContext {
private final boolean _textCommitOnClose;
private final ColumnStatistics _columnStatistics;
private final boolean _realtimeConversion;
+ private final File _consumerDir;
private final int[] _immutableToMutableIdMap;
public Common(File indexDir, int lengthOfLongestEntry,
@@ -302,7 +314,7 @@ public interface IndexCreationContext {
FieldSpec fieldSpec, boolean sorted, int cardinality, int
totalNumberOfEntries,
int totalDocs, boolean hasDictionary, Comparable<?> minValue,
Comparable<?> maxValue,
boolean forwardIndexDisabled, Object sortedUniqueElementsArray,
boolean optimizeDictionary, boolean fixedLength,
- boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean
realtimeConversion,
+ boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean
realtimeConversion, File consumerDir,
int[] immutableToMutableIdMap) {
_indexDir = indexDir;
_lengthOfLongestEntry = lengthOfLongestEntry;
@@ -324,6 +336,7 @@ public interface IndexCreationContext {
_textCommitOnClose = textCommitOnClose;
_columnStatistics = columnStatistics;
_realtimeConversion = realtimeConversion;
+ _consumerDir = consumerDir;
_immutableToMutableIdMap = immutableToMutableIdMap;
}
@@ -416,6 +429,11 @@ public interface IndexCreationContext {
return _realtimeConversion;
}
+ @Override
+ public File getConsumerDir() {
+ return _consumerDir;
+ }
+
@Override
public int[] getImmutableToMutableIdMap() {
return _immutableToMutableIdMap;
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 5381bdc430..c879c1be52 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -121,6 +121,8 @@ public class SegmentGeneratorConfig implements Serializable
{
private boolean _optimizeDictionaryForMetrics = false;
private double _noDictionarySizeRatioThreshold =
IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
private boolean _realtimeConversion = false;
+ // consumerDir contains data from the consuming segment, and is used during
_realtimeConversion optimization
+ private File _consumerDir;
private final Map<String, FieldIndexConfigs> _indexConfigsByColName;
// constructed from FieldConfig
@@ -732,6 +734,14 @@ public class SegmentGeneratorConfig implements
Serializable {
_realtimeConversion = realtimeConversion;
}
+ public File getConsumerDir() {
+ return _consumerDir;
+ }
+
+ public void setConsumerDir(File consumerDir) {
+ _consumerDir = consumerDir;
+ }
+
public void setNoDictionarySizeRatioThreshold(double
noDictionarySizeRatioThreshold) {
_noDictionarySizeRatioThreshold = noDictionarySizeRatioThreshold;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
index ab152c0fd2..57d71aa278 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
@@ -20,7 +20,6 @@ package org.apache.pinot.segment.spi.index.mutable.provider;
import java.io.File;
import java.util.Objects;
-import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.spi.data.FieldSpec;
@@ -94,7 +93,6 @@ public class MutableIndexContext {
return _avgNumMultiValues;
}
- @Nullable
public File getConsumerDir() {
return _consumerDir;
}
@@ -114,7 +112,6 @@ public class MutableIndexContext {
private int _estimatedColSize;
private int _estimatedCardinality;
private int _avgNumMultiValues;
- @Nullable
private File _consumerDir;
public Builder withMemoryManager(PinotDataBufferMemoryManager
memoryManager) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]