This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcccfc  Add genericRow file reader and writer (#6997)
6fcccfc is described below

commit 6fcccfc39c0179b9ff856d5ebc11098dbaaaccf1
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 27 23:10:15 2021 -0700

    Add genericRow file reader and writer (#6997)
    
    Add file reader and writer for genericRow leveraging the ser/de classes
    Replace the usage of ser/de to the file reader/writer
---
 .../processing/collector/ConcatCollector.java      | 102 +++++++--------------
 .../GenericRowDeserializer.java                    |   2 +-
 .../genericrow/GenericRowFileReader.java           |  81 ++++++++++++++++
 .../genericrow/GenericRowFileWriter.java           |  72 +++++++++++++++
 .../GenericRowSerializer.java                      |   2 +-
 .../{serde => genericrow}/GenericRowSerDeTest.java |   2 +-
 6 files changed, 190 insertions(+), 71 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
index d760274..d7cc173 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
@@ -20,10 +20,7 @@ package org.apache.pinot.core.segment.processing.collector;
 
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.Arrays;
-import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -31,9 +28,8 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.segment.processing.serde.GenericRowDeserializer;
-import org.apache.pinot.core.segment.processing.serde.GenericRowSerializer;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
@@ -44,26 +40,20 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  * A Collector implementation for collecting and concatenating all incoming 
rows.
  */
 public class ConcatCollector implements Collector {
-  private static final String RECORDS_FILE_NAME = "collector.records";
+  private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
+  private static final String RECORD_DATA_FILE_NAME = "record.data";
 
   private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
-  private final GenericRowSerializer _genericRowSerializer;
   private final int _numSortColumns;
   private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
-  private final File _collectorRecordFile;
+  private final File _recordOffsetFile;
+  private final File _recordDataFile;
 
+  private GenericRowFileWriter _recordFileWriter;
+  private GenericRowFileReader _recordFileReader;
   private int _numDocs;
 
-  // TODO: Avoid using BufferedOutputStream, and use ByteBuffer directly.
-  //  However, ByteBuffer has a limitation that the size cannot exceed 2G.
-  //  There are no limits on the size of data inserted into the {@link 
Collector}.
-  //  Hence, would need to implement a hybrid approach or a trigger a flush 
when size exceeds on Collector.
-  private BufferedOutputStream _collectorRecordOutputStream;
-  private List<Long> _collectorRecordOffsets;
-  private PinotDataBuffer _collectorRecordBuffer;
-  private GenericRowDeserializer _genericRowDeserializer;
-
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
     List<String> sortOrder = collectorConfig.getSortOrder();
     if (CollectionUtils.isNotEmpty(sortOrder)) {
@@ -92,48 +82,33 @@ public class ConcatCollector implements Collector {
         }
       }
     }
-    // TODO: Pass 'includeNullFields' from the config
-    _genericRowSerializer = new GenericRowSerializer(_fieldSpecs, true);
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), 
String.format("concat_collector_%d", System.currentTimeMillis()));
     Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s 
for %s with config: %s",
         _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), 
collectorConfig);
-    _collectorRecordFile = new File(_workingDir, RECORDS_FILE_NAME);
-
-    initializeBuffer();
-  }
+    _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
+    _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
 
-  private void initializeBuffer() {
-    Preconditions.checkState(!_collectorRecordFile.exists(),
-        "Collector record file: " + _collectorRecordFile + " already exists");
     try {
-      _collectorRecordOutputStream = new BufferedOutputStream(new 
FileOutputStream(_collectorRecordFile));
-    } catch (FileNotFoundException e) {
-      throw new RuntimeException(e);
+      reset();
+    } catch (IOException e) {
+      throw new RuntimeException("Caught exception while resetting the 
collector", e);
     }
-    _collectorRecordOffsets = new ArrayList<>();
-    _collectorRecordOffsets.add(0L);
-    _numDocs = 0;
   }
 
   @Override
   public void collect(GenericRow genericRow)
       throws IOException {
-    byte[] genericRowBytes = _genericRowSerializer.serialize(genericRow);
-    _collectorRecordOutputStream.write(genericRowBytes);
-    _collectorRecordOffsets.add(_collectorRecordOffsets.get(_numDocs) + 
genericRowBytes.length);
+    _recordFileWriter.write(genericRow);
     _numDocs++;
   }
 
   @Override
   public Iterator<GenericRow> iterator()
       throws IOException {
-    _collectorRecordOutputStream.flush();
-    _collectorRecordBuffer = PinotDataBuffer
-        .mapFile(_collectorRecordFile, true, 0, 
_collectorRecordOffsets.get(_numDocs), PinotDataBuffer.NATIVE_ORDER,
-            "ConcatCollector: generic row buffer");
-    _genericRowDeserializer = new 
GenericRowDeserializer(_collectorRecordBuffer, _fieldSpecs, true);
+    _recordFileWriter.close();
+    _recordFileReader = new GenericRowFileReader(_recordOffsetFile, 
_recordDataFile, _fieldSpecs, true);
 
     // TODO: A lot of this code can be made common across Collectors, once 
{@link RollupCollector} is also converted to off heap implementation
     if (_numSortColumns != 0) {
@@ -142,12 +117,9 @@ public class ConcatCollector implements Collector {
         sortedDocIds[i] = i;
       }
 
-      Arrays.quickSort(0, _numDocs, (i1, i2) -> {
-        long startOffset1 = _collectorRecordOffsets.get(sortedDocIds[i1]);
-        long startOffset2 = _collectorRecordOffsets.get(sortedDocIds[i2]);
-        return 
_sortOrderComparator.compare(_genericRowDeserializer.partialDeserialize(startOffset1,
 _numSortColumns),
-            _genericRowDeserializer.partialDeserialize(startOffset2, 
_numSortColumns));
-      }, (i1, i2) -> {
+      Arrays.quickSort(0, _numDocs, (i1, i2) -> _sortOrderComparator
+          .compare(_recordFileReader.partialRead(sortedDocIds[i1], 
_numSortColumns),
+              _recordFileReader.partialRead(sortedDocIds[i2], 
_numSortColumns)), (i1, i2) -> {
         int temp = sortedDocIds[i1];
         sortedDocIds[i1] = sortedDocIds[i2];
         sortedDocIds[i2] = temp;
@@ -170,13 +142,8 @@ public class ConcatCollector implements Collector {
 
       @Override
       public GenericRow next() {
-        long offset;
-        if (sortedDocIds == null) {
-          offset = _collectorRecordOffsets.get(_nextDocId++);
-        } else {
-          offset = _collectorRecordOffsets.get(sortedDocIds[_nextDocId++]);
-        }
-        return _genericRowDeserializer.deserialize(offset, _reuse);
+        int docId = sortedDocIds != null ? sortedDocIds[_nextDocId++] : 
_nextDocId++;
+        return _recordFileReader.read(docId, _reuse);
       }
     };
   }
@@ -189,28 +156,27 @@ public class ConcatCollector implements Collector {
   @Override
   public void reset()
       throws IOException {
-    try {
-      if (_collectorRecordBuffer != null) {
-        _collectorRecordBuffer.close();
-      }
-      if (_collectorRecordOutputStream != null) {
-        _collectorRecordOutputStream.close();
-      }
-    } finally {
-      FileUtils.deleteQuietly(_collectorRecordFile);
+    if (_recordFileWriter != null) {
+      _recordFileWriter.close();
     }
-    initializeBuffer();
+    if (_recordFileReader != null) {
+      _recordFileReader.close();
+    }
+    FileUtils.cleanDirectory(_workingDir);
+    // TODO: Pass 'includeNullFields' from the config
+    _recordFileWriter = new GenericRowFileWriter(_recordOffsetFile, 
_recordDataFile, _fieldSpecs, true);
+    _numDocs = 0;
   }
 
   @Override
   public void close()
       throws IOException {
     try {
-      if (_collectorRecordBuffer != null) {
-        _collectorRecordBuffer.close();
+      if (_recordFileWriter != null) {
+        _recordFileWriter.close();
       }
-      if (_collectorRecordOutputStream != null) {
-        _collectorRecordOutputStream.close();
+      if (_recordFileReader != null) {
+        _recordFileReader.close();
       }
     } finally {
       FileUtils.deleteQuietly(_workingDir);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
similarity index 99%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
index 83a5863..caee49b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.processing.serde;
+package org.apache.pinot.core.segment.processing.genericrow;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
new file mode 100644
index 0000000..0e1ba1b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.List;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * File reader for {@link GenericRow}. The input files should be generated by 
the {@link GenericRowFileWriter}.
+ */
+public class GenericRowFileReader implements Closeable {
+  private final int _numRows;
+  private final PinotDataBuffer _offsetBuffer;
+  private final PinotDataBuffer _dataBuffer;
+  private final GenericRowDeserializer _deserializer;
+
+  public GenericRowFileReader(File offsetFile, File dataFile, List<FieldSpec> 
fieldSpecs, boolean includeNullFields)
+      throws IOException {
+    long offsetFileLength = offsetFile.length();
+    _numRows = (int) (offsetFileLength >>> 3); // offsetFileLength / Long.BYTES
+    _offsetBuffer = PinotDataBuffer
+        .mapFile(offsetFile, true, 0L, offsetFileLength, ByteOrder.BIG_ENDIAN, 
"GenericRow offset buffer");
+    _dataBuffer = PinotDataBuffer
+        .mapFile(dataFile, true, 0L, dataFile.length(), 
PinotDataBuffer.NATIVE_ORDER, "GenericRow data buffer");
+    _deserializer = new GenericRowDeserializer(_dataBuffer, fieldSpecs, 
includeNullFields);
+  }
+
+  /**
+   * Returns the number of rows within the files.
+   */
+  public int getNumRows() {
+    return _numRows;
+  }
+
+  /**
+   * Reads the data of the given row id into the given reusable row.
+   */
+  public GenericRow read(int rowId, GenericRow reuse) {
+    long offset = _offsetBuffer.getLong((long) rowId << 3); // rowId * 
Long.BYTES
+    _deserializer.deserialize(offset, reuse);
+    return reuse;
+  }
+
+  /**
+   * Reads the first several fields of the given row id.
+   */
+  public Object[] partialRead(int rowId, int numFields) {
+    long offset = _offsetBuffer.getLong((long) rowId << 3);
+    return _deserializer.partialDeserialize(offset, numFields);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _offsetBuffer.close();
+    _dataBuffer.close();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
new file mode 100644
index 0000000..4f7957c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * File writer for {@link GenericRow}. The writer will generate to 2 files, 
one for the offsets (BIG_ENDIAN) and one for
+ * the actual data (NATIVE_ORDER). The generated files can be read by the 
{@link GenericRowFileReader}. There is no
+ * version control for the files generated because the files should only be 
used as intermediate format and read in the
+ * same host (different host might have different NATIVE_ORDER).
+ *
+ * TODO: Consider using ByteBuffer instead of OutputStream.
+ */
+public class GenericRowFileWriter implements Closeable {
+  private final DataOutputStream _offsetStream;
+  private final BufferedOutputStream _dataStream;
+  private final GenericRowSerializer _serializer;
+
+  private long _nextOffset;
+
+  public GenericRowFileWriter(File offsetFile, File dataFile, List<FieldSpec> 
fieldSpecs, boolean includeNullFields)
+      throws FileNotFoundException {
+    _offsetStream = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(offsetFile)));
+    _dataStream = new BufferedOutputStream(new FileOutputStream(dataFile));
+    _serializer = new GenericRowSerializer(fieldSpecs, includeNullFields);
+  }
+
+  /**
+   * Writes the given row into the files.
+   */
+  public void write(GenericRow genericRow)
+      throws IOException {
+    _offsetStream.writeLong(_nextOffset);
+    byte[] bytes = _serializer.serialize(genericRow);
+    _dataStream.write(bytes);
+    _nextOffset += bytes.length;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _offsetStream.close();
+    _dataStream.close();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
similarity index 99%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
index 7d70201..7549670 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.processing.serde;
+package org.apache.pinot.core.segment.processing.genericrow;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
similarity index 99%
rename from 
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
index fc2d8ab..cc85316 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.processing.serde;
+package org.apache.pinot.core.segment.processing.genericrow;
 
 import java.util.Arrays;
 import java.util.Collections;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to