This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 449c95c6b9 Making segmentMapper do the init and cleanup of 
RecordReader (#10874)
449c95c6b9 is described below

commit 449c95c6b93499417a75574607d160231158e2e3
Author: swaminathanmanish <[email protected]>
AuthorDate: Tue Jun 13 20:39:25 2023 -0700

    Making segmentMapper do the init and cleanup of RecordReader (#10874)
---
 .../framework/SegmentProcessorFramework.java       | 34 ++++++++--
 .../segment/processing/mapper/SegmentMapper.java   | 75 ++++++++++++++--------
 .../processing/framework/SegmentMapperTest.java    |  4 +-
 .../framework/SegmentProcessorFrameworkTest.java   | 37 +++++++++++
 .../spi/data/readers/RecordReaderFileConfig.java   | 57 ++++++++++++++++
 5 files changed, 172 insertions(+), 35 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index d64f35f515..f1757c1e4c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentProcessorFramework {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentProcessorFramework.class);
 
-  private final List<RecordReader> _recordReaders;
+  private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final SegmentProcessorConfig _segmentProcessorConfig;
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
@@ -66,17 +67,26 @@ public class SegmentProcessorFramework {
   private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
 
   /**
-   * Initializes the SegmentProcessorFramework with record readers, config and 
working directory.
+   * Initializes the SegmentProcessorFramework with record readers, config and 
working directory. We will now rely on
+   * users passing RecordReaderFileConfig, since that also allows us to do 
lazy initialization of RecordReaders.
+   * Please use the other constructor that uses RecordReaderFileConfig.
    */
+  @Deprecated
   public SegmentProcessorFramework(List<RecordReader> recordReaders, 
SegmentProcessorConfig segmentProcessorConfig,
       File workingDir)
       throws IOException {
-    Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is 
provided");
+    this(segmentProcessorConfig, workingDir, 
convertRecordReadersToRecordReaderFileConfig(recordReaders));
+  }
+
+  public SegmentProcessorFramework(SegmentProcessorConfig 
segmentProcessorConfig, File workingDir,
+      List<RecordReaderFileConfig> recordReaderFileConfigs)
+      throws IOException {
 
+    Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No 
recordReaderFileConfigs provided");
     LOGGER.info("Initializing SegmentProcessorFramework with {} record 
readers, config: {}, working dir: {}",
-        recordReaders.size(), segmentProcessorConfig, 
workingDir.getAbsolutePath());
+        recordReaderFileConfigs.size(), segmentProcessorConfig, 
workingDir.getAbsolutePath());
+    _recordReaderFileConfigs = recordReaderFileConfigs;
 
-    _recordReaders = recordReaders;
     _segmentProcessorConfig = segmentProcessorConfig;
 
     _mapperOutputDir = new File(workingDir, "mapper_output");
@@ -87,6 +97,16 @@ public class SegmentProcessorFramework {
     FileUtils.forceMkdir(_segmentsOutputDir);
   }
 
+  private static List<RecordReaderFileConfig> 
convertRecordReadersToRecordReaderFileConfig(
+      List<RecordReader> recordReaders) {
+    Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is 
provided");
+    List<RecordReaderFileConfig> recordReaderFileConfigs = new ArrayList<>();
+    for (RecordReader recordReader : recordReaders) {
+      recordReaderFileConfigs.add(new RecordReaderFileConfig(recordReader));
+    }
+    return recordReaderFileConfigs;
+  }
+
   /**
    * Processes records from record readers per the provided config, returns 
the directories for the generated segments.
    */
@@ -114,8 +134,8 @@ public class SegmentProcessorFramework {
   private List<File> doProcess()
       throws Exception {
     // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaders.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaders, 
_segmentProcessorConfig, _mapperOutputDir);
+    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
+    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_segmentProcessorConfig, _mapperOutputDir);
     _partitionToFileManagerMap = mapper.map();
 
     // Check for mapper output files
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 302aa2869f..dae6fd52ee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +62,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentMapper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
 
-  private final List<RecordReader> _recordReaders;
+  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
 
@@ -75,8 +77,9 @@ public class SegmentMapper {
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
 
-  public SegmentMapper(List<RecordReader> recordReaders, 
SegmentProcessorConfig processorConfig, File mapperOutputDir) {
-    _recordReaders = recordReaders;
+  public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
+      SegmentProcessorConfig processorConfig, File mapperOutputDir) {
+    _recordReaderFileConfigs = recordReaderFileConfigs;
     _processorConfig = processorConfig;
     _mapperOutputDir = mapperOutputDir;
 
@@ -97,8 +100,9 @@ public class SegmentMapper {
     }
     // Time partition + partition from partitioners
     _partitionsBuffer = new String[numPartitioners + 1];
+
     LOGGER.info("Initialized mapper with {} record readers, output dir: {}, 
timeHandler: {}, partitioners: {}",
-        _recordReaders.size(), _mapperOutputDir, _timeHandler.getClass(),
+        _recordReaderFileConfigs.size(), _mapperOutputDir, 
_timeHandler.getClass(),
         Arrays.stream(_partitioners).map(p -> 
p.getClass().toString()).collect(Collectors.joining(",")));
   }
 
@@ -122,33 +126,24 @@ public class SegmentMapper {
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaders.size();
+    int totalCount = _recordReaderFileConfigs.size();
     int count = 1;
     GenericRow reuse = new GenericRow();
-    for (RecordReader recordReader : _recordReaders) {
-      observer.accept(String.format("Doing map phase on data from RecordReader 
(%d out of %d)", count++, totalCount));
-      while (recordReader.hasNext()) {
-        reuse = recordReader.next(reuse);
-
-        // TODO: Add ComplexTypeTransformer here. Currently it is not 
idempotent so cannot add it
-
-        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-          //noinspection unchecked
-          for (GenericRow row : (Collection<GenericRow>) 
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-            GenericRow transformedRow = _recordTransformer.transform(row);
-            if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
-              writeRecord(transformedRow);
-            }
-          }
-        } else {
-          GenericRow transformedRow = _recordTransformer.transform(reuse);
-          if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
-            writeRecord(transformedRow);
-          }
+    for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
+      RecordReader recordReader = recordReaderFileConfig._recordReader;
+      if (recordReader == null) {
+        try {
+          recordReader =
+              
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
+                  recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
+          mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        } finally {
+          recordReader.close();
         }
-
-        reuse.clear();
+      } else {
+        mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
       }
+      count++;
     }
 
     for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
@@ -158,6 +153,32 @@ public class SegmentMapper {
     return _partitionToFileManagerMap;
   }
 
+  private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+      Consumer<Object> observer, int count, int totalCount) throws Exception {
+    observer.accept(String.format("Doing map phase on data from RecordReader 
(%d out of %d)", count, totalCount));
+    while (recordReader.hasNext()) {
+      reuse = recordReader.next(reuse);
+
+      // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent 
so cannot add it
+
+      if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+        //noinspection unchecked
+        for (GenericRow row : (Collection<GenericRow>) 
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+          GenericRow transformedRow = _recordTransformer.transform(row);
+          if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
+            writeRecord(transformedRow);
+          }
+        }
+      } else {
+        GenericRow transformedRow = _recordTransformer.transform(reuse);
+        if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
+          writeRecord(transformedRow);
+        }
+      }
+      reuse.clear();
+    }
+  }
+
   private void writeRecord(GenericRow row)
       throws IOException {
     String timePartition = _timeHandler.handleTime(row);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 3616e190d6..3a3ee33398 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -141,7 +142,8 @@ public class SegmentMapperTest {
     PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader();
     segmentRecordReader.init(_indexDir, null, null, true);
     SegmentMapper segmentMapper =
-        new SegmentMapper(Collections.singletonList(segmentRecordReader), 
processorConfig, mapperOutputDir);
+        new SegmentMapper(Collections.singletonList(new 
RecordReaderFileConfig(segmentRecordReader)),
+            processorConfig, mapperOutputDir);
     Map<String, GenericRowFileManager> partitionToFileManagerMap = 
segmentMapper.map();
     segmentRecordReader.close();
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c5868063e7..e977ad364c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.framework;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,8 +43,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
@@ -175,6 +178,40 @@ public class SegmentProcessorFrameworkTest {
     }
   }
 
+  /**
+   * Test lazy initialization of record readers. Here we create
+   * RecoderReaderFileConfig and the actual reader is initialized during the
+   * map phase.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderFileConfigInit() throws Exception {
+    File workingDir = new File(TEMP_DIR, "segmentOutput");
+    FileUtils.forceMkdir(workingDir);
+    ClassLoader classLoader = getClass().getClassLoader();
+    URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
+    RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
+        new File(resource.toURI()),
+        null, null);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
+        setTimeColumnName("time").build();
+
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId",
+                DataType.STRING, "")
+            .addSingleValueDimension("teamName", DataType.STRING, "")
+            .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:MILLISECONDS").build();
+
+    SegmentProcessorConfig config =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
+    SegmentProcessorFramework framework = new 
SegmentProcessorFramework(config, workingDir, List.of(reader));
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    ImmutableSegment segment = 
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    assertEquals(segmentMetadata.getTotalDocs(), 51);
+  }
+
   @Test
   public void testSingleSegment()
       throws Exception {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
new file mode 100644
index 0000000000..628de1052a
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.File;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Wraps RecordReader info to instantiate a reader. Users can either pass in 
the
+ * RecordReader instance directly or the info required to initialize the 
RecordReader, so that the
+ * RecordReader can be initialized just when its about to be used, which 
avoids early/eager
+ * initialization/memory allocation.
+ */
+public class RecordReaderFileConfig {
+  public final FileFormat _fileFormat;
+  public final File _dataFile;
+  public final Set<String> _fieldsToRead;
+  public final RecordReaderConfig _recordReaderConfig;
+  public final RecordReader _recordReader;
+
+  // Pass in the info needed to initialize the reader
+  public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, 
Set<String> fieldsToRead,
+      @Nullable RecordReaderConfig recordReaderConfig) {
+    _fileFormat = fileFormat;
+    _dataFile = dataFile;
+    _fieldsToRead = fieldsToRead;
+    _recordReaderConfig = recordReaderConfig;
+    _recordReader = null;
+  }
+
+  // Pass in the reader instance directly
+  public RecordReaderFileConfig(RecordReader recordReader) {
+    _recordReader = recordReader;
+    _fileFormat = null;
+    _dataFile = null;
+    _fieldsToRead = null;
+    _recordReaderConfig = null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to