siddharthteotia commented on a change in pull request #5409: URL: https://github.com/apache/incubator-pinot/pull/5409#discussion_r430205187
########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/FixedBitIntReaderWriterV2.java ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public final class FixedBitIntReaderWriterV2 implements Closeable { + private volatile PinotDataBitSetV2 _dataBitSet; + private final int _numBitsPerValue; + + public FixedBitIntReaderWriterV2(PinotDataBuffer dataBuffer, int numValues, int numBitsPerValue) { + Preconditions + .checkState(dataBuffer.size() == (int) (((long) numValues * numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE)); + _dataBitSet = PinotDataBitSetV2.createBitSet(dataBuffer, numBitsPerValue); + _numBitsPerValue = numBitsPerValue; + } + + /** + * Read dictionaryId for a particular docId + * @param index docId to get the dictionaryId for + * @return dictionaryId + */ + public int readInt(int index) { + return _dataBitSet.readInt(index); + } + + /** + * Array based API to read dictionaryIds for a contiguous + * range of docIds starting at startDocId for a given length + * @param startDocId docId range start + * @param length length of contiguous docId range + * @param buffer out buffer to read dictionaryIds into + */ + public void readInt(int startDocId, int length, int[] buffer) { + _dataBitSet.readInt(startDocId, length, buffer); + } + + /** + * Array based API to read dictionaryIds for an array of docIds + * which are monotonically increasing but not necessarily contiguous + * @param docIds array of docIds to read the dictionaryIds for + * @param docIdStartIndex start index in docIds array + * @param docIdLength length to process in docIds array + * @param values out array to store the dictionaryIds into + * @param valuesStartIndex start index in values array + */ + public void readValues(int[] docIds, int docIdStartIndex, int docIdLength, int[] values, int valuesStartIndex) { + int docIdEndIndex = docIdStartIndex + docIdLength - 1; + if (shouldBulkRead(docIds, docIdStartIndex, docIdEndIndex)) { + _dataBitSet.readInt(docIds, docIdStartIndex, docIdLength, values, valuesStartIndex); + } else { + for (int i = docIdStartIndex; i <= docIdEndIndex; i++) { + values[valuesStartIndex++] = _dataBitSet.readInt(docIds[i]); + } + } + } + + private boolean shouldBulkRead(int[] docIds, int startIndex, int endIndex) { + int numDocsToRead = endIndex - startIndex + 1; + int docIdRange = docIds[endIndex] - docIds[startIndex] + 1; + if (docIdRange > DocIdSetPlanNode.MAX_DOC_PER_CALL) { Review comment: I think this is coming from the previous commit. The latest version of the PE doesn't have this check. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/FixedBitIntReaderWriterV2.java ########## @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public final class FixedBitIntReaderWriterV2 implements Closeable { + private volatile PinotDataBitSetV2 _dataBitSet; Review comment: done ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java ########## @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public abstract class PinotDataBitSetV2 implements Closeable { + private static final int BYTE_MASK = 0xFF; + static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 2-bit encoding + + private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS = + ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]); + + protected PinotDataBuffer _dataBuffer; + protected int _numBitsPerValue; + + /** + * Unpack single dictId at the given docId. This is efficient Review comment: done ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java ########## @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public abstract class PinotDataBitSetV2 implements Closeable { + private static final int BYTE_MASK = 0xFF; + static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 2-bit encoding + + private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS = + ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]); + + protected PinotDataBuffer _dataBuffer; + protected int _numBitsPerValue; + + /** + * Unpack single dictId at the given docId. This is efficient + * because of simplified bitmath. + * @param index docId + * @return unpacked dictId + */ + public abstract int readInt(int index); + + /** + * Unpack dictIds for a contiguous range of docIds represented by startIndex + * and length. This uses vectorization as much as possible for all the aligned + * reads and also takes care of the small byte-sized window of unaligned read. + * @param startIndex start docId + * @param length length + * @param out out array to store the unpacked dictIds + */ + public abstract void readInt(int startIndex, int length, int[] out); + + /** + * Unpack dictIds for an array of docIds[] which is not necessarily + * contiguous. So there could be gaps in the array: + * e.g: [1, 3, 7, 9, 11, 12] + * The actual read is done by the previous API since that is efficient + * as it exploits contiguity and uses vectorization. However, since + * the out[] array has to be correctly populated with the unpacked dictId + * for each docId, a post-processing step is needed after the bulk contiguous + * read to correctly set the unpacked dictId into the out array throwing away + * the unnecessary dictIds unpacked as part of contiguous read + * @param docIds docIds array + * @param docIdsStartIndex starting index in the docIds array + * @param length length to read (number of docIds to read in the array) + * @param out out array to store the unpacked dictIds + * @param outpos starting index in the out array + */ + public void readInt(int[] docIds, int docIdsStartIndex, int length, int[] out, int outpos) { + int startDocId = docIds[docIdsStartIndex]; + int endDocId = docIds[docIdsStartIndex + length - 1]; + int[] dictIds = THREAD_LOCAL_DICT_IDS.get(); + // do a contiguous bulk read + readInt(startDocId, endDocId - startDocId + 1, dictIds); Review comment: See the outer API in FixedBitIntReaderWriterV2 that calls this. That API tries to judge sparseness before deciding to do bulk read. The decision is made on a chunk of values at a time ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java ########## @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public abstract class PinotDataBitSetV2 implements Closeable { + private static final int BYTE_MASK = 0xFF; + static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 2-bit encoding + + private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS = + ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]); + + protected PinotDataBuffer _dataBuffer; + protected int _numBitsPerValue; + + /** + * Unpack single dictId at the given docId. This is efficient + * because of simplified bitmath. + * @param index docId + * @return unpacked dictId + */ + public abstract int readInt(int index); + + /** + * Unpack dictIds for a contiguous range of docIds represented by startIndex + * and length. This uses vectorization as much as possible for all the aligned + * reads and also takes care of the small byte-sized window of unaligned read. + * @param startIndex start docId + * @param length length + * @param out out array to store the unpacked dictIds + */ + public abstract void readInt(int startIndex, int length, int[] out); + + /** + * Unpack dictIds for an array of docIds[] which is not necessarily + * contiguous. So there could be gaps in the array: + * e.g: [1, 3, 7, 9, 11, 12] + * The actual read is done by the previous API since that is efficient + * as it exploits contiguity and uses vectorization. However, since + * the out[] array has to be correctly populated with the unpacked dictId + * for each docId, a post-processing step is needed after the bulk contiguous + * read to correctly set the unpacked dictId into the out array throwing away + * the unnecessary dictIds unpacked as part of contiguous read + * @param docIds docIds array + * @param docIdsStartIndex starting index in the docIds array + * @param length length to read (number of docIds to read in the array) + * @param out out array to store the unpacked dictIds + * @param outpos starting index in the out array + */ + public void readInt(int[] docIds, int docIdsStartIndex, int length, int[] out, int outpos) { Review comment: > For the benchmark, you should also compare the worst case scenario such as 3, 5, 9, 17 bits I am still working on adding faster methods for non power of 2. Follow up will address this. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java ########## @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public abstract class PinotDataBitSetV2 implements Closeable { + private static final int BYTE_MASK = 0xFF; + static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 2-bit encoding + + private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS = + ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]); + + protected PinotDataBuffer _dataBuffer; + protected int _numBitsPerValue; + + /** + * Unpack single dictId at the given docId. This is efficient + * because of simplified bitmath. + * @param index docId + * @return unpacked dictId + */ + public abstract int readInt(int index); + + /** + * Unpack dictIds for a contiguous range of docIds represented by startIndex + * and length. This uses vectorization as much as possible for all the aligned + * reads and also takes care of the small byte-sized window of unaligned read. + * @param startIndex start docId + * @param length length + * @param out out array to store the unpacked dictIds + */ + public abstract void readInt(int startIndex, int length, int[] out); + + /** + * Unpack dictIds for an array of docIds[] which is not necessarily + * contiguous. So there could be gaps in the array: + * e.g: [1, 3, 7, 9, 11, 12] + * The actual read is done by the previous API since that is efficient + * as it exploits contiguity and uses vectorization. However, since + * the out[] array has to be correctly populated with the unpacked dictId + * for each docId, a post-processing step is needed after the bulk contiguous + * read to correctly set the unpacked dictId into the out array throwing away + * the unnecessary dictIds unpacked as part of contiguous read + * @param docIds docIds array + * @param docIdsStartIndex starting index in the docIds array + * @param length length to read (number of docIds to read in the array) + * @param out out array to store the unpacked dictIds + * @param outpos starting index in the out array + */ + public void readInt(int[] docIds, int docIdsStartIndex, int length, int[] out, int outpos) { + int startDocId = docIds[docIdsStartIndex]; + int endDocId = docIds[docIdsStartIndex + length - 1]; + int[] dictIds = THREAD_LOCAL_DICT_IDS.get(); + // do a contiguous bulk read + readInt(startDocId, endDocId - startDocId + 1, dictIds); + out[outpos] = dictIds[0]; + // set the unpacked dictId correctly. this is needed since there could + // be gaps and some dictIds may have to be thrown/ignored. + for (int i = 1; i < length; i++) { + out[outpos + i] = dictIds[docIds[docIdsStartIndex + i] - startDocId]; + } + } + + public static PinotDataBitSetV2 createBitSet(PinotDataBuffer pinotDataBuffer, int numBitsPerValue) { + switch (numBitsPerValue) { + case 2: + return new Bit2Encoded(pinotDataBuffer, numBitsPerValue); + case 4: + return new Bit4Encoded(pinotDataBuffer, numBitsPerValue); + case 8: + return new Bit8Encoded(pinotDataBuffer, numBitsPerValue); + case 16: + return new Bit16Encoded(pinotDataBuffer, numBitsPerValue); + case 32: + return new RawInt(pinotDataBuffer, numBitsPerValue); + default: + throw new UnsupportedOperationException(numBitsPerValue + "not supported by PinotDataBitSetV2"); + } + } + + public static class Bit2Encoded extends PinotDataBitSetV2 { + Bit2Encoded(PinotDataBuffer dataBuffer, int numBits) { + _dataBuffer = dataBuffer; + _numBitsPerValue = numBits; + } + + @Override + public int readInt(int index) { + long bitOffset = (long) index * _numBitsPerValue; + int byteOffset = (int) (bitOffset / Byte.SIZE); Review comment: done ########## File path: pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java ########## @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.util; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +public abstract class PinotDataBitSetV2 implements Closeable { + private static final int BYTE_MASK = 0xFF; + static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 2-bit encoding + + private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS = + ThreadLocal.withInitial(() -> new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]); + + protected PinotDataBuffer _dataBuffer; + protected int _numBitsPerValue; + + /** + * Unpack single dictId at the given docId. This is efficient + * because of simplified bitmath. + * @param index docId + * @return unpacked dictId + */ + public abstract int readInt(int index); + + /** + * Unpack dictIds for a contiguous range of docIds represented by startIndex + * and length. This uses vectorization as much as possible for all the aligned + * reads and also takes care of the small byte-sized window of unaligned read. + * @param startIndex start docId + * @param length length + * @param out out array to store the unpacked dictIds + */ + public abstract void readInt(int startIndex, int length, int[] out); + + /** + * Unpack dictIds for an array of docIds[] which is not necessarily + * contiguous. So there could be gaps in the array: + * e.g: [1, 3, 7, 9, 11, 12] + * The actual read is done by the previous API since that is efficient + * as it exploits contiguity and uses vectorization. However, since + * the out[] array has to be correctly populated with the unpacked dictId + * for each docId, a post-processing step is needed after the bulk contiguous + * read to correctly set the unpacked dictId into the out array throwing away + * the unnecessary dictIds unpacked as part of contiguous read + * @param docIds docIds array + * @param docIdsStartIndex starting index in the docIds array + * @param length length to read (number of docIds to read in the array) + * @param out out array to store the unpacked dictIds + * @param outpos starting index in the out array + */ + public void readInt(int[] docIds, int docIdsStartIndex, int length, int[] out, int outpos) { Review comment: See the outer API in FixedBitIntReaderWriterV2 that calls this. That API tries to judge sparseness before deciding to do bulk read. The decision is made on a chunk of values at a time and this bulk API is called for each chunk. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
