This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch benchmark_forward_index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/benchmark_forward_index by
this push:
new 68b0041 initial changes for fast int reader writer for SV fwd index
68b0041 is described below
commit 68b00418b3bb388bfc6da3f318dbaaae06a4f465
Author: Siddharth Teotia <[email protected]>
AuthorDate: Fri Apr 24 19:06:11 2020 -0700
initial changes for fast int reader writer for SV fwd index
---
.../reader/impl/v1/FixedBitSingleValueReader.java | 9 +-
.../core/io/util/FastFixedBitIntReaderWriter.java | 141 +++++++++++++++++++++
.../writer/impl/v1/FixedBitSingleValueWriter.java | 5 +-
.../apache/pinot/perf/ForwardIndexBenchmark.java | 4 +-
4 files changed, 149 insertions(+), 10 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/FixedBitSingleValueReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/FixedBitSingleValueReader.java
index dafe031..aef73b3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/FixedBitSingleValueReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/FixedBitSingleValueReader.java
@@ -21,15 +21,16 @@ package org.apache.pinot.core.io.reader.impl.v1;
import java.io.IOException;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.ReaderContext;
+import org.apache.pinot.core.io.util.FastFixedBitIntReaderWriter;
import org.apache.pinot.core.io.util.FixedBitIntReaderWriter;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
public final class FixedBitSingleValueReader extends
BaseSingleColumnSingleValueReader {
- private final FixedBitIntReaderWriter _reader;
+ private final FastFixedBitIntReaderWriter _reader;
public FixedBitSingleValueReader(PinotDataBuffer dataBuffer, int numRows,
int numBitsPerValue) {
- _reader = new FixedBitIntReaderWriter(dataBuffer, numRows,
numBitsPerValue);
+ _reader = new FastFixedBitIntReaderWriter(dataBuffer, numRows,
numBitsPerValue);
}
@Override
@@ -50,10 +51,6 @@ public final class FixedBitSingleValueReader extends
BaseSingleColumnSingleValue
}
}
- public void readBulk(int startIndex, int length, int[] buffer) {
- _reader.readInt(startIndex, length, buffer);
- }
-
@Override
public ReaderContext createContext() {
return null;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/util/FastFixedBitIntReaderWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/FastFixedBitIntReaderWriter.java
new file mode 100644
index 0000000..e2e60c4
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/FastFixedBitIntReaderWriter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import me.lemire.integercompression.BitPacking;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+public final class FastFixedBitIntReaderWriter implements Closeable {
+ private final int _numBitsPerValue;
+ private final int chunkToPack[];
+ private final int packedChunk[];
+ private final int unpackedChunk[];
+ private int currentIndexInChunk = 0;
+ private final PinotDataBuffer _dataBuffer;
+ private int _offset = 0;
+ private static final int PACK_BOUNDARY = 32;
+
+ // Currently only supports SV columns
+ public FastFixedBitIntReaderWriter(PinotDataBuffer dataBuffer, int
numValues, int numBitsPerValue) {
+ // TODO: this should be rounded off to next multiple of 4
+ Preconditions
+ .checkState(dataBuffer.size() == (int) (((long) numValues *
numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE));
+ chunkToPack = new int[PACK_BOUNDARY];
+ unpackedChunk = new int[numBitsPerValue];
+ packedChunk = new int[numBitsPerValue];
+ // this should be power of 2 -- 2, 4, 8, 16, 32
+ // to ensure that within a 32 bit int value, we can pack
32/numBitsPerValue ints
+ // completely without straddling.
+ // e.g pack 4 ints into single int by using 8 bits per value
+ _numBitsPerValue = numBitsPerValue;
+ _dataBuffer = dataBuffer;
+ }
+
+ public int readInt(int index) {
+ long bitOffset = (long) index * _numBitsPerValue;
+ int byteOffset = (int) (bitOffset / Byte.SIZE);
+ if (bitOffset % 32 != 0) {
+ // read the int containing the packed value at this
+ // index and unpack all values packed in that int.
+ switch (_numBitsPerValue) {
+ case 2:
+ fastUnpack2(byteOffset);
+ case 4:
+ fastUnpack4(byteOffset);
+ case 8:
+ fastUnpack8(byteOffset);
+ case 16:
+ fastUnpack16(byteOffset);
+ }
+ }
+ // always keep 32/numBitsPerValue number of integers
+ // unpacked by reading a single int. Depending on
+ // the value of numBitsPerValue, we could have
+ // 2, 4, 8, 16 integers unpacked from a single int
+ return unpackedChunk[index % _numBitsPerValue];
+ }
+
+ private void fastUnpack16(int byteOffset) {
+ int val = _dataBuffer.getInt(byteOffset);
+ unpackedChunk[0] = (val & 65535);
+ unpackedChunk[1] = (val >>> 16);
+ }
+
+ private void fastUnpack8(int byteOffset) {
+ int val = _dataBuffer.getInt(byteOffset);
+ unpackedChunk[0] = (val & 255);
+ unpackedChunk[1] = ((val >>> 8) & 255);
+ unpackedChunk[2] = ((val >>> 16) & 255);
+ unpackedChunk[3] = (val >>> 24);
+ }
+
+ private void fastUnpack4(int byteOffset) {
+ int val = _dataBuffer.getInt(byteOffset);
+ unpackedChunk[0] = ((val) & 15);
+ unpackedChunk[1] = ((val >>> 4) & 15);
+ unpackedChunk[2] = ((val >>> 8) & 15);
+ unpackedChunk[3] = ((val >>> 12) & 15);
+ unpackedChunk[4] = ((val >>> 16) & 15);
+ unpackedChunk[5] = ((val >>> 20) & 15);
+ unpackedChunk[6] = ((val >>> 24) & 15);
+ unpackedChunk[7] = (val >>> 28);
+ }
+
+ private void fastUnpack2(int byteOffset) {
+ int val = _dataBuffer.getInt(byteOffset);
+ unpackedChunk[0] = (val & 3);
+ unpackedChunk[1] = ((val >>> 2) & 3);
+ unpackedChunk[2] = ((val >>> 4) & 3);
+ unpackedChunk[3] = ((val >>> 6) & 3);
+ unpackedChunk[4] = ((val >>> 8) & 3);
+ unpackedChunk[5] = ((val >>> 10) & 3);
+ unpackedChunk[6] = ((val >>> 12) & 3);
+ unpackedChunk[7] = ((val >>> 14) & 3);
+ unpackedChunk[8] = ((val >>> 16) & 3);
+ unpackedChunk[9] = ((val >>> 18) & 3);
+ unpackedChunk[10] = ((val >>> 20) & 3);
+ unpackedChunk[11] = ((val >>> 22) & 3);
+ unpackedChunk[12] = ((val >>> 24) & 3);
+ unpackedChunk[13]= ((val >>> 26) & 3);
+ unpackedChunk[14] = ((val >>> 28) & 3);
+ unpackedChunk[15] = (val >>> 30);
+ }
+
+ public void writeInt(int index, int value) {
+ if (index > 0 && index % PACK_BOUNDARY == 0) {
+ // try to pack 32 integer values at a time
+ BitPacking.fastpack(chunkToPack, 0, packedChunk, 0, _numBitsPerValue);
+ for (int i = 0; i < _numBitsPerValue; i++) {
+ _dataBuffer.putInt(_offset * Integer.BYTES, packedChunk[i]);
+ _offset++;
+ }
+ }
+ chunkToPack[currentIndexInChunk] = value;
+ currentIndexInChunk = (currentIndexInChunk + 1) % PACK_BOUNDARY;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedBitSingleValueWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedBitSingleValueWriter.java
index 7506293..b81b63c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedBitSingleValueWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedBitSingleValueWriter.java
@@ -21,13 +21,14 @@ package org.apache.pinot.core.io.writer.impl.v1;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
+import org.apache.pinot.core.io.util.FastFixedBitIntReaderWriter;
import org.apache.pinot.core.io.util.FixedBitIntReaderWriter;
import org.apache.pinot.core.io.writer.SingleColumnSingleValueWriter;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
public class FixedBitSingleValueWriter implements
SingleColumnSingleValueWriter {
- private FixedBitIntReaderWriter dataFileWriter;
+ private FastFixedBitIntReaderWriter dataFileWriter;
public FixedBitSingleValueWriter(File file, int rows, int columnSizeInBits)
throws Exception {
@@ -36,7 +37,7 @@ public class FixedBitSingleValueWriter implements
SingleColumnSingleValueWriter
// Backward-compatible: index file is always big-endian
PinotDataBuffer dataBuffer =
PinotDataBuffer.mapFile(file, false, 0, length, ByteOrder.BIG_ENDIAN,
getClass().getSimpleName());
- dataFileWriter = new FixedBitIntReaderWriter(dataBuffer, rows,
columnSizeInBits);
+ dataFileWriter = new FastFixedBitIntReaderWriter(dataBuffer, rows,
columnSizeInBits);
}
@Override
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/ForwardIndexBenchmark.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/ForwardIndexBenchmark.java
index d6c5f49..db8a342 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/ForwardIndexBenchmark.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/ForwardIndexBenchmark.java
@@ -143,13 +143,13 @@ public class ForwardIndexBenchmark {
static void readPinotFwdIndex()
throws IOException {
PinotDataBuffer pinotDataBuffer =
PinotDataBuffer.loadBigEndianFile(pinotOutFile);
- FixedBitSingleValueReader reader = new
FixedBitSingleValueReader(pinotDataBuffer, ROWS, NUM_BITS);
+ FixedBitIntReaderWriter reader = new
FixedBitIntReaderWriter(pinotDataBuffer, ROWS, NUM_BITS);
Stopwatch stopwatch = Stopwatch.createUnstarted();
int[] unpacked = new int[32];
stopwatch.start();
// sequentially unpack 32 integers at a time
for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
- reader.readBulk(startIndex, 32, unpacked);
+ reader.readInt(startIndex, 32, unpacked);
}
stopwatch.stop();
System.out.println("pinot took: " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]