This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch use-tr13
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7ebb53397cd5ba69a50f4d315953e847c211733f
Author: Jack Li(Analytics Engineering) <[email protected]>
AuthorDate: Wed Apr 17 15:24:00 2019 -0700

    Use tr13
---
 pinot-core/pom.xml                                 |   4 +-
 .../index/column/PhysicalColumnIndexContainer.java |   6 +-
 .../readers/OnHeapTrieBasedStringDictionary.java   | 104 +++++++++++++++++----
 .../pinot/perf/StringDictionaryPerfTest.java       |  21 ++++-
 pom.xml                                            |   6 +-
 5 files changed, 112 insertions(+), 29 deletions(-)

diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index ee7df05..2ddf510 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -173,8 +173,8 @@
       <artifactId>jopt-simple</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-collections4</artifactId>
+      <groupId>com.ning</groupId>
+      <artifactId>tr13</artifactId>
     </dependency>
 
     <!-- Kafka -->
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 459c457..362e333 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -173,7 +173,11 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
         int numBytesPerValue = metadata.getColumnMaxLength();
         byte paddingByte = (byte) metadata.getPaddingCharacter();
         if (loadOnHeapTrieBasedDictionary) {
-          return new OnHeapTrieBasedStringDictionary(dictionaryBuffer, length, 
numBytesPerValue, paddingByte);
+          try {
+            return new OnHeapTrieBasedStringDictionary(dictionaryBuffer, 
length, numBytesPerValue, paddingByte);
+          } catch (IOException e) {
+            throw new RuntimeException(e.toString());
+          }
         } else if (loadOnHeap) {
           return new OnHeapStringDictionary(dictionaryBuffer, length, 
numBytesPerValue, paddingByte);
         } else {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java
index dbc0e41..42b528c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/OnHeapTrieBasedStringDictionary.java
@@ -18,62 +18,91 @@
  */
 package org.apache.pinot.core.segment.index.readers;
 
+import com.ning.tr13.KeyValueSource;
+import com.ning.tr13.TrieLookup;
+import com.ning.tr13.build.SimpleTrieBuilder;
+import com.ning.tr13.impl.bytes.ByteBufferBytesTrieLookup;
+import com.ning.tr13.impl.bytes.SimpleBytesTrieBuilder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
-import org.apache.commons.collections4.trie.PatriciaTrie;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class OnHeapTrieBasedStringDictionary extends OnHeapTrieBasedDictionary 
{
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OnHeapTrieBasedStringDictionary.class);
   private final byte _paddingByte;
   private final String[] _unpaddedStrings;
   private final String[] _paddedStrings;
-  private final PatriciaTrie<Integer> _paddedStringToIdTrie;
-  private final PatriciaTrie<Integer> _unpaddedStringToIdTrie;
+  private final TrieLookup<byte[]> _trieLookup;
+
+//  private final PatriciaTrie<Integer> _paddedStringToIdTrie;
+//  private final PatriciaTrie<Integer> _unpaddedStringToIdTrie;
 
   public OnHeapTrieBasedStringDictionary(PinotDataBuffer dataBuffer, int 
length, int numBytesPerValue,
-      byte paddingByte) {
+      byte paddingByte) throws IOException {
     super(dataBuffer, length, numBytesPerValue, paddingByte);
 
     _paddingByte = paddingByte;
     byte[] buffer = new byte[numBytesPerValue];
     _unpaddedStrings = new String[length];
-    _unpaddedStringToIdTrie = new PatriciaTrie<>();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    SimpleBytesTrieBuilder simpleBytesTrieBuilder = new 
SimpleBytesTrieBuilder(new IndexReader(dataBuffer, length, numBytesPerValue, 
paddingByte));
+    try {
+      simpleBytesTrieBuilder.buildAndWrite(byteArrayOutputStream, false);
+    } catch (IOException e) {
+      LOGGER.error("Exception when building trie tree!", e);
+      throw e;
+    }
+
+    byte[] raw = byteArrayOutputStream.toByteArray();
+    _trieLookup = new ByteBufferBytesTrieLookup(ByteBuffer.wrap(raw), 
raw.length);
+//    _unpaddedStringToIdTrie = new PatriciaTrie<>();
 
     for (int i = 0; i < length; i++) {
       _unpaddedStrings[i] = getUnpaddedString(i, buffer);
-      _unpaddedStringToIdTrie.put(_unpaddedStrings[i], i);
+//      _unpaddedStringToIdTrie.put(_unpaddedStrings[i], i);
     }
 
     if (_paddingByte == 0) {
       _paddedStrings = null;
-      _paddedStringToIdTrie = null;
+//      _paddedStringToIdTrie = null;
     } else {
       _paddedStrings = new String[length];
-      _paddedStringToIdTrie = new PatriciaTrie<>();
+//      _paddedStringToIdTrie = new PatriciaTrie<>();
 
       for (int i = 0; i < length; i++) {
         _paddedStrings[i] = getPaddedString(i, buffer);
-        _paddedStringToIdTrie.put(_paddedStrings[i], i);
+//        _paddedStringToIdTrie.put(_paddedStrings[i], i);
       }
     }
   }
 
   @Override
   public int indexOf(Object rawValue) {
-    PatriciaTrie<Integer> stringToIdTrie = (_paddingByte == 0) ? 
_unpaddedStringToIdTrie : _paddedStringToIdTrie;
-    Integer index = stringToIdTrie.get(rawValue);
-    return (index != null) ? index : -1;
+
+    byte[] index = _trieLookup.findValue(StringUtil.encodeUtf8((String) 
rawValue));
+    return (index != null) ? convertByteArrayToInt(index) : -1;
+//    PatriciaTrie<Integer> stringToIdTrie = (_paddingByte == 0) ? 
_unpaddedStringToIdTrie : _paddedStringToIdTrie;
+//    Integer index = stringToIdTrie.get(rawValue);
+//    return (index != null) ? index : -1;
   }
 
   @Override
   public int insertionIndexOf(Object rawValue) {
-    if (_paddingByte == 0) {
-      Integer id = _unpaddedStringToIdTrie.get(rawValue);
-      return (id != null) ? id : Arrays.binarySearch(_unpaddedStrings, 
rawValue);
-    } else {
-      String paddedValue = padString((String) rawValue);
-      return Arrays.binarySearch(_paddedStrings, paddedValue);
-    }
+    return indexOf(rawValue);
+//    if (_paddingByte == 0) {
+//      Integer id = _unpaddedStringToIdTrie.get(rawValue);
+//      return (id != null) ? id : Arrays.binarySearch(_unpaddedStrings, 
rawValue);
+//    } else {
+//      String paddedValue = padString((String) rawValue);
+//      return Arrays.binarySearch(_paddedStrings, paddedValue);
+//    }
   }
 
   @Override
@@ -85,4 +114,41 @@ public class OnHeapTrieBasedStringDictionary extends 
OnHeapTrieBasedDictionary {
   public String getStringValue(int dictId) {
     return _unpaddedStrings[dictId];
   }
+
+
+  private class IndexReader extends KeyValueSource<byte[]> {
+    private PinotDataBuffer _dataBuffer;
+    private int _length;
+    private int _numBytesPerValue;
+    private byte _paddingByte;
+    private int _lineNumber;
+
+    public IndexReader(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue,
+        byte paddingByte) {
+      _dataBuffer = dataBuffer;
+      _length = length;
+      _numBytesPerValue = numBytesPerValue;
+      _paddingByte = paddingByte;
+    }
+
+    @Override
+    public void readAll(ValueCallback<byte[]> handler) throws IOException {
+      for (int i = 0; i < _length; i++) {
+        ++_lineNumber;
+        byte[] buffer = new byte[_numBytesPerValue];
+        byte[] value = ByteBuffer.allocate(4).putInt(i).array();
+        handler.handleEntry(getBytes(i, buffer), value);
+      }
+    }
+
+    @Override
+    public int getLineNumber() {
+      return _lineNumber;
+    }
+  }
+
+  private int convertByteArrayToInt(byte[] bytes) {
+    ByteBuffer wrapped = ByteBuffer.wrap(bytes);
+    return wrapped.getInt();
+  }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
index c703a3e..9a11a8d 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 
 
@@ -66,6 +67,7 @@ public class StringDictionaryPerfTest {
    */
   public void buildSegment(int dictLength)
       throws Exception {
+    long startTime = System.currentTimeMillis();
     Schema schema = new Schema();
     String segmentName = "perfTestSegment" + System.currentTimeMillis();
     _indexDir = new File(TMP_DIR + File.separator + segmentName);
@@ -107,6 +109,7 @@ public class StringDictionaryPerfTest {
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(config, new GenericRowRecordReader(rows, schema));
     driver.build();
+    System.out.println("Time to take for building segments: " + 
(System.currentTimeMillis() - startTime));
   }
 
   /**
@@ -116,16 +119,25 @@ public class StringDictionaryPerfTest {
    * @param numLookups Number of lookups to perform
    * @throws Exception
    */
-  public void perfTestLookups(int numLookups)
+  public void perfTestLookups(int numLookups, boolean trieBased)
       throws Exception {
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, 
ReadMode.heap);
+    IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
+    defaultIndexLoadingConfig.setReadMode(ReadMode.heap);
+    Set<String> indexedColumnNames = new HashSet<>();
+    indexedColumnNames.add(COLUMN_NAME);
+    if (trieBased) {
+      
defaultIndexLoadingConfig.setOnHeapTrieBasedDictionaryColumns(indexedColumnNames);
+    } else {
+      defaultIndexLoadingConfig.setOnHeapDictionaryColumns(indexedColumnNames);
+    }
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, 
defaultIndexLoadingConfig);
     Dictionary dictionary = immutableSegment.getDictionary(COLUMN_NAME);
 
     Random random = new Random(System.nanoTime());
     long start = System.currentTimeMillis();
 
     for (int i = 0; i < numLookups; i++) {
-      int index = 1 + random.nextInt(_dictLength);
+      int index = random.nextInt(_dictLength);
       dictionary.indexOf(_inputStrings[index]);
     }
 
@@ -141,9 +153,10 @@ public class StringDictionaryPerfTest {
 
     int dictLength = Integer.valueOf(args[0]);
     int numLookups = Integer.valueOf(args[1]);
+    boolean trieBased = Boolean.parseBoolean(args[2]);
 
     StringDictionaryPerfTest test = new StringDictionaryPerfTest();
     test.buildSegment(dictLength);
-    test.perfTestLookups(numLookups);
+    test.perfTestLookups(numLookups, trieBased);
   }
 }
diff --git a/pom.xml b/pom.xml
index ced7c84..e37e657 100644
--- a/pom.xml
+++ b/pom.xml
@@ -806,9 +806,9 @@
         <version>0.9.2</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.commons</groupId>
-        <artifactId>commons-collections4</artifactId>
-        <version>4.3</version>
+        <groupId>com.ning</groupId>
+        <artifactId>tr13</artifactId>
+        <version>0.3.0</version>
       </dependency>
     </dependencies>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to