This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9a8fa79199 Allow passing custom record reader to be inited/closed in
SegmentProcessorFramework (#12529)
9a8fa79199 is described below
commit 9a8fa791999cb792943fdaab19b498daeb9fb0dc
Author: swaminathanmanish <[email protected]>
AuthorDate: Fri Mar 1 16:56:59 2024 -0800
Allow passing custom record reader to be inited/closed in
SegmentProcessorFramework (#12529)
---
.../framework/SegmentProcessorFrameworkTest.java | 17 +++++++++------
.../spi/data/readers/RecordReaderFileConfig.java | 25 +++++++++++++---------
2 files changed, 26 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 0ec9261d92..c2c4c51789 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -194,9 +195,11 @@ public class SegmentProcessorFrameworkTest {
FileUtils.forceMkdir(workingDir);
ClassLoader classLoader = getClass().getClassLoader();
URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
- RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
- new File(resource.toURI()),
+ RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.CSV, new File(resource.toURI()),
null, null);
+ RecordReaderFileConfig recordReaderFileConfig = new
RecordReaderFileConfig(FileFormat.CSV,
+ new File(resource.toURI()),
+ null, null, recordReader);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
setTimeColumnName("time").build();
@@ -208,13 +211,15 @@ public class SegmentProcessorFrameworkTest {
SegmentProcessorConfig config =
new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
- SegmentProcessorFramework framework = new
SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
- Collections.emptyList(), null);
+ SegmentProcessorFramework framework = new
SegmentProcessorFramework(config, workingDir,
+ ImmutableList.of(recordReaderFileConfig), Collections.emptyList(),
null);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment =
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 52);
+ // Verify reader is closed
+
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(),
true);
}
@Test
@@ -686,7 +691,7 @@ public class SegmentProcessorFrameworkTest {
ClassLoader classLoader = getClass().getClassLoader();
URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
RecordReaderFileConfig recordReaderFileConfig =
- new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()),
null, null);
+ new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()),
null, null, null);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
Schema schema =
@@ -738,7 +743,7 @@ public class SegmentProcessorFrameworkTest {
// output size threshold configured).
expectedTotalDocsCount = 52;
- recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new
File(resource.toURI()), null, null);
+ recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new
File(resource.toURI()), null, null, null);
segmentConfig = new
SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix")
.setSegmentNamePostfix("testPostfix").build();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 51e4ed0cfb..e7566cb0ff 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -24,10 +24,8 @@ import javax.annotation.Nullable;
/**
- * Wraps RecordReader info to instantiate a reader. Users can either pass in
the
- * RecordReader instance directly or the info required to initialize the
RecordReader, so that the
- * RecordReader can be initialized just when its about to be used, which
avoids early/eager
- * initialization/memory allocation.
+ * Placeholder for all RecordReader configs. Manages the lifecycle of a
RecordReader by initing/closing within the
+ * Segment creation framework.
*/
public class RecordReaderFileConfig {
public final FileFormat _fileFormat;
@@ -44,20 +42,22 @@ public class RecordReaderFileConfig {
// Pass in the info needed to initialize the reader
public RecordReaderFileConfig(FileFormat fileFormat, File dataFile,
Set<String> fieldsToRead,
- @Nullable RecordReaderConfig recordReaderConfig) {
+ @Nullable RecordReaderConfig recordReaderConfig, @Nullable RecordReader
recordReader) {
_fileFormat = fileFormat;
_dataFile = dataFile;
_fieldsToRead = fieldsToRead;
_recordReaderConfig = recordReaderConfig;
- _recordReader = null;
- // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns
the RecordReader, so it should be closed
- // by RecordReaderFileConfig as well.
+ // Users can pass in custom readers
+ _recordReader = recordReader;
+ // RecordReaderFileConfig owns the lifecycle of RecordReader, to be inited
and closed.
_isDelegateReader = false;
_isRecordReaderInitialized = false;
_isRecordReaderClosed = false;
}
- // Pass in the reader instance directly
+ // Keeping this for backwards compatibility. We want the lifecycle of the
reader to be managed internally
+ // (inited/closed) by SegmentProcessorFramework.
+ @Deprecated
public RecordReaderFileConfig(RecordReader recordReader) {
_recordReader = recordReader;
_fileFormat = null;
@@ -76,7 +76,12 @@ public class RecordReaderFileConfig {
public RecordReader getRecordReader()
throws Exception {
if (!_isRecordReaderInitialized) {
- _recordReader = RecordReaderFactory.getRecordReader(_fileFormat,
_dataFile, _fieldsToRead, _recordReaderConfig);
+ if (_recordReader == null) {
+ // Record reader instance to be created and inited
+ _recordReader = RecordReaderFactory.getRecordReader(_fileFormat,
_dataFile, _fieldsToRead, _recordReaderConfig);
+ } else {
+ _recordReader.init(_dataFile, _fieldsToRead, _recordReaderConfig);
+ }
_isRecordReaderInitialized = true;
}
return _recordReader;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]