This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch use-tr13 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7ebb53397cd5ba69a50f4d315953e847c211733f Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Wed Apr 17 15:24:00 2019 -0700 Use tr13 --- pinot-core/pom.xml | 4 +- .../index/column/PhysicalColumnIndexContainer.java | 6 +- .../readers/OnHeapTrieBasedStringDictionary.java | 104 +++++++++++++++++---- .../pinot/perf/StringDictionaryPerfTest.java | 21 ++++- pom.xml | 6 +- 5 files changed, 112 insertions(+), 29 deletions(-) diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml index ee7df05..2ddf510 100644 --- a/pinot-core/pom.xml +++ b/pinot-core/pom.xml @@ -173,8 +173,8 @@ <artifactId>jopt-simple</artifactId> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> + <groupId>com.ning</groupId> + <artifactId>tr13</artifactId> </dependency> <!-- Kafka --> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index 459c457..362e333 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -173,7 +173,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer int numBytesPerValue = metadata.getColumnMaxLength(); byte paddingByte = (byte) metadata.getPaddingCharacter(); if (loadOnHeapTrieBasedDictionary) { - return new OnHeapTrieBasedStringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte); + try { + return new OnHeapTrieBasedStringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte); + } catch (IOException e) { + throw new RuntimeException(e.toString()); + } } else if (loadOnHeap) { return new OnHeapStringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte); } else { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java index dbc0e41..42b528c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java @@ -18,62 +18,91 @@ */ package org.apache.pinot.core.segment.index.readers; +import com.ning.tr13.KeyValueSource; +import com.ning.tr13.TrieLookup; +import com.ning.tr13.build.SimpleTrieBuilder; +import com.ning.tr13.impl.bytes.ByteBufferBytesTrieLookup; +import com.ning.tr13.impl.bytes.SimpleBytesTrieBuilder; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.commons.collections4.trie.PatriciaTrie; +import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OnHeapTrieBasedStringDictionary extends OnHeapTrieBasedDictionary { + private static final Logger LOGGER = LoggerFactory.getLogger(OnHeapTrieBasedStringDictionary.class); private final byte _paddingByte; private final String[] _unpaddedStrings; private final String[] _paddedStrings; - private final PatriciaTrie<Integer> _paddedStringToIdTrie; - private final PatriciaTrie<Integer> _unpaddedStringToIdTrie; + private final TrieLookup<byte[]> _trieLookup; + +// private final PatriciaTrie<Integer> _paddedStringToIdTrie; +// private final PatriciaTrie<Integer> _unpaddedStringToIdTrie; public OnHeapTrieBasedStringDictionary(PinotDataBuffer dataBuffer, int length, int numBytesPerValue, - byte paddingByte) { + byte paddingByte) throws IOException { super(dataBuffer, length, numBytesPerValue, paddingByte); _paddingByte = paddingByte; byte[] buffer = new byte[numBytesPerValue]; _unpaddedStrings = new String[length]; - _unpaddedStringToIdTrie = new PatriciaTrie<>(); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + SimpleBytesTrieBuilder simpleBytesTrieBuilder = new SimpleBytesTrieBuilder(new IndexReader(dataBuffer, length, numBytesPerValue, paddingByte)); + try { + simpleBytesTrieBuilder.buildAndWrite(byteArrayOutputStream, false); + } catch (IOException e) { + LOGGER.error("Exception when building trie tree!", e); + throw e; + } + + byte[] raw = byteArrayOutputStream.toByteArray(); + _trieLookup = new ByteBufferBytesTrieLookup(ByteBuffer.wrap(raw), raw.length); +// _unpaddedStringToIdTrie = new PatriciaTrie<>(); for (int i = 0; i < length; i++) { _unpaddedStrings[i] = getUnpaddedString(i, buffer); - _unpaddedStringToIdTrie.put(_unpaddedStrings[i], i); +// _unpaddedStringToIdTrie.put(_unpaddedStrings[i], i); } if (_paddingByte == 0) { _paddedStrings = null; - _paddedStringToIdTrie = null; +// _paddedStringToIdTrie = null; } else { _paddedStrings = new String[length]; - _paddedStringToIdTrie = new PatriciaTrie<>(); +// _paddedStringToIdTrie = new PatriciaTrie<>(); for (int i = 0; i < length; i++) { _paddedStrings[i] = getPaddedString(i, buffer); - _paddedStringToIdTrie.put(_paddedStrings[i], i); +// _paddedStringToIdTrie.put(_paddedStrings[i], i); } } } @Override public int indexOf(Object rawValue) { - PatriciaTrie<Integer> stringToIdTrie = (_paddingByte == 0) ? _unpaddedStringToIdTrie : _paddedStringToIdTrie; - Integer index = stringToIdTrie.get(rawValue); - return (index != null) ? index : -1; + + byte[] index = _trieLookup.findValue(StringUtil.encodeUtf8((String) rawValue)); + return (index != null) ? convertByteArrayToInt(index) : -1; +// PatriciaTrie<Integer> stringToIdTrie = (_paddingByte == 0) ? _unpaddedStringToIdTrie : _paddedStringToIdTrie; +// Integer index = stringToIdTrie.get(rawValue); +// return (index != null) ? index : -1; } @Override public int insertionIndexOf(Object rawValue) { - if (_paddingByte == 0) { - Integer id = _unpaddedStringToIdTrie.get(rawValue); - return (id != null) ? id : Arrays.binarySearch(_unpaddedStrings, rawValue); - } else { - String paddedValue = padString((String) rawValue); - return Arrays.binarySearch(_paddedStrings, paddedValue); - } + return indexOf(rawValue); +// if (_paddingByte == 0) { +// Integer id = _unpaddedStringToIdTrie.get(rawValue); +// return (id != null) ? id : Arrays.binarySearch(_unpaddedStrings, rawValue); +// } else { +// String paddedValue = padString((String) rawValue); +// return Arrays.binarySearch(_paddedStrings, paddedValue); +// } } @Override @@ -85,4 +114,41 @@ public class OnHeapTrieBasedStringDictionary extends OnHeapTrieBasedDictionary { public String getStringValue(int dictId) { return _unpaddedStrings[dictId]; } + + + private class IndexReader extends KeyValueSource<byte[]> { + private PinotDataBuffer _dataBuffer; + private int _length; + private int _numBytesPerValue; + private byte _paddingByte; + private int _lineNumber; + + public IndexReader(PinotDataBuffer dataBuffer, int length, int numBytesPerValue, + byte paddingByte) { + _dataBuffer = dataBuffer; + _length = length; + _numBytesPerValue = numBytesPerValue; + _paddingByte = paddingByte; + } + + @Override + public void readAll(ValueCallback<byte[]> handler) throws IOException { + for (int i = 0; i < _length; i++) { + ++_lineNumber; + byte[] buffer = new byte[_numBytesPerValue]; + byte[] value = ByteBuffer.allocate(4).putInt(i).array(); + handler.handleEntry(getBytes(i, buffer), value); + } + } + + @Override + public int getLineNumber() { + return _lineNumber; + } + } + + private int convertByteArrayToInt(byte[] bytes) { + ByteBuffer wrapped = ByteBuffer.wrap(bytes); + return wrapped.getInt(); + } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java index c703a3e..9a11a8d 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.core.segment.index.readers.Dictionary; @@ -66,6 +67,7 @@ public class StringDictionaryPerfTest { */ public void buildSegment(int dictLength) throws Exception { + long startTime = System.currentTimeMillis(); Schema schema = new Schema(); String segmentName = "perfTestSegment" + System.currentTimeMillis(); _indexDir = new File(TMP_DIR + File.separator + segmentName); @@ -107,6 +109,7 @@ public class StringDictionaryPerfTest { SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(config, new GenericRowRecordReader(rows, schema)); driver.build(); + System.out.println("Time to take for building segments: " + (System.currentTimeMillis() - startTime)); } /** @@ -116,16 +119,25 @@ public class StringDictionaryPerfTest { * @param numLookups Number of lookups to perform * @throws Exception */ - public void perfTestLookups(int numLookups) + public void perfTestLookups(int numLookups, boolean trieBased) throws Exception { - ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, ReadMode.heap); + IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig(); + defaultIndexLoadingConfig.setReadMode(ReadMode.heap); + Set<String> indexedColumnNames = new HashSet<>(); + indexedColumnNames.add(COLUMN_NAME); + if (trieBased) { + defaultIndexLoadingConfig.setOnHeapTrieBasedDictionaryColumns(indexedColumnNames); + } else { + defaultIndexLoadingConfig.setOnHeapDictionaryColumns(indexedColumnNames); + } + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, defaultIndexLoadingConfig); Dictionary dictionary = immutableSegment.getDictionary(COLUMN_NAME); Random random = new Random(System.nanoTime()); long start = System.currentTimeMillis(); for (int i = 0; i < numLookups; i++) { - int index = 1 + random.nextInt(_dictLength); + int index = random.nextInt(_dictLength); dictionary.indexOf(_inputStrings[index]); } @@ -141,9 +153,10 @@ public class StringDictionaryPerfTest { int dictLength = Integer.valueOf(args[0]); int numLookups = Integer.valueOf(args[1]); + boolean trieBased = Boolean.parseBoolean(args[2]); StringDictionaryPerfTest test = new StringDictionaryPerfTest(); test.buildSegment(dictLength); - test.perfTestLookups(numLookups); + test.perfTestLookups(numLookups, trieBased); } } diff --git a/pom.xml b/pom.xml index ced7c84..e37e657 100644 --- a/pom.xml +++ b/pom.xml @@ -806,9 +806,9 @@ <version>0.9.2</version> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - <version>4.3</version> + <groupId>com.ning</groupId> + <artifactId>tr13</artifactId> + <version>0.3.0</version> </dependency> </dependencies> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
