This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d0eb25db Reduce Heap Usage of OnHeapStringDictionary (#12223)
76d0eb25db is described below

commit 76d0eb25db180f1bf88933146bc5e2e25f66f8df
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Thu Jan 25 12:51:45 2024 -0800

    Reduce Heap Usage of OnHeapStringDictionary (#12223)
    
    * Add Interning capability for OnHeapStringDictionary
    
    * Adding tests and some fixes
    
    * Remove support for older indexingConfig
    
    * Address review comments
---
 .../pinot/common/utils/FALFInternerTest.java       | 168 +++++++++++++++++++++
 .../index/dictionary/DictionaryIndexType.java      |  55 ++++++-
 .../index/dictionary/DictionaryInternerHolder.java |  58 +++++++
 .../index/readers/OnHeapStringDictionary.java      |  17 ++-
 .../index/dictionary/DictionaryIndexTypeTest.java  |  70 ++++++++-
 .../index/readers/ImmutableDictionaryTest.java     |   2 +-
 .../ImmutableDictionaryTypeConversionTest.java     |  25 ++-
 .../segment/spi/index/DictionaryIndexConfig.java   |  37 ++++-
 .../org/apache/pinot/spi/config/table/Intern.java  |  79 ++++++++++
 .../org/apache/pinot/spi/utils/FALFInterner.java   | 148 ++++++++++++++++++
 .../pinot/spi/config/table/IndexingConfigTest.java |   7 +-
 11 files changed, 642 insertions(+), 24 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
new file mode 100644
index 0000000000..7c36acb902
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.pinot.spi.utils.FALFInterner;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FALFInternerTest {
+  @Test
+  public void testInterningByteBuffers() {
+    Random random = new Random(1);
+
+    int nUniqueObjs = 1024;
+    int nTotalObjs = 8 * nUniqueObjs;
+
+    String[] allObjs = new String[nTotalObjs];
+
+    // Create an array of objects where each object should have ~8 copies
+    for (int i = 0; i < nTotalObjs; i++) {
+      int next = random.nextInt(nUniqueObjs);
+      allObjs[i] = Integer.toString(next);
+    }
+
+    Interner<String> exactInterner = Interners.newStrongInterner();
+    Interner<String> falfInterner = new FALFInterner(nUniqueObjs);
+    Interner<String> falfInternerCustomHash = new FALFInterner(nUniqueObjs, s 
-> hashCode((String) s), Objects::equals);
+
+    // Go over all objects and intern them using both exact and FALF interners
+    int nHits1 = runInterning(allObjs, exactInterner, true);
+    int nHits2 = runInterning(allObjs, falfInterner, true);
+    int nHits3 = runInterning(allObjs, falfInternerCustomHash, true);
+
+    System.out.println(nHits1);
+    System.out.println(nHits2);
+    System.out.println(nHits3);
+
+    // For the exact interner, we should get a hit for each object except the
+    // first nUniqueObjs.
+    Assert.assertEquals(nHits1, nTotalObjs - nUniqueObjs);
+
+    // For the FALF interner, due to its fixed size and thus almost inevitable 
hash
+    // collisions, the number of hits is smaller. Let's verify that it's not 
too small, though.
+    Assert.assertTrue(nHits2 > (nTotalObjs - nUniqueObjs) * 0.4);
+
+    // With the better hash function, FALF interner should have more hits
+    Assert.assertTrue(nHits3 > (nTotalObjs - nUniqueObjs) * 0.6);
+
+    // Ad hoc benchmarking code. Disabled to avoid test slowdown.
+    // In one run the MacBook laptop, FALFInterner below performs nearly twice 
faster
+    // (1217 ms vs 2230 ms) With custom hash function, FALFInterner's speed is 
about the
+    // same as the Guava interner.
+//    for (int j = 0; j < 3; j++) {
+//      long time0 = System.currentTimeMillis();
+//      long totNHits = 0;
+//      for (int i = 0; i < 10000; i++) {
+//        totNHits += runInterning(allObjs, exactInterner, false);
+//      }
+//      long time1 = System.currentTimeMillis();
+//      System.out.println("Guava interner. totNHits = " + totNHits + ", time 
= " + (time1 - time0));
+//
+//      time0 = System.currentTimeMillis();
+//      totNHits = 0;
+//      for (int i = 0; i < 10000; i++) {
+//        totNHits += runInterning(allObjs, falfInterner, false);
+//      }
+//      time1 = System.currentTimeMillis();
+//      System.out.println("FALF interner. totNHits = " + totNHits + ", time = 
" + (time1 - time0));
+//
+//      time0 = System.currentTimeMillis();
+//      totNHits = 0;
+//      for (int i = 0; i < 10000; i++) {
+//        totNHits += runInterning(allObjs, falfInternerCustomHash, false);
+//      }
+//      time1 = System.currentTimeMillis();
+//      System.out.println("FALF interner Custom Hash. totNHits = " + totNHits 
+ ", time = " + (time1 - time0));
+//    }
+  }
+
+  private int runInterning(String[] objs, Interner<String> interner, boolean 
performAssert) {
+    int nHits = 0;
+    for (String origObj : objs) {
+      String internedObj = interner.intern(origObj);
+      if (performAssert) {
+        Assert.assertEquals(origObj, internedObj);
+      }
+      if (origObj != internedObj) {
+        nHits++;
+      }
+    }
+    return nHits;
+  }
+
+  // Custom hash code implementation, that gives better distribution than 
standard hashCode()
+
+  private static final int C1 = 0xcc9e2d51;
+  private static final int C2 = 0x1b873593;
+
+  public static int hashCode(String s) {
+    int h1 = 0;
+
+    // step through value 2 chars at a time
+    for (int i = 1; i < s.length(); i += 2) {
+      int k1 = s.charAt(i - 1) | (s.charAt(i) << 16);
+      h1 = nextHashCode(k1, h1);
+    }
+
+    // deal with any remaining characters
+    if ((s.length() & 1) == 1) {
+      int k1 = s.charAt(s.length() - 1);
+      k1 = mixK1(k1);
+      h1 ^= k1;
+    }
+
+    return fmix(h1, s.length() * 2);
+  }
+
+  private static int nextHashCode(int value, int prevHashCode) {
+    int k1 = mixK1(value);
+    return mixH1(prevHashCode, k1);
+  }
+
+  private static int mixK1(int k1) {
+    k1 *= C1;
+    k1 = Integer.rotateLeft(k1, 15);
+    k1 *= C2;
+    return k1;
+  }
+
+  private static int mixH1(int h1, int k1) {
+    h1 ^= k1;
+    h1 = Integer.rotateLeft(h1, 13);
+    h1 = h1 * 5 + 0xe6546b64;
+    return h1;
+  }
+
+  private static int fmix(int h1, int len) {
+    // Force all bits to avalanche
+    h1 ^= len;
+    h1 ^= h1 >>> 16;
+    h1 *= 0x85ebca6b;
+    h1 ^= h1 >>> 13;
+    h1 *= 0xc2b2ae35;
+    h1 ^= h1 >>> 16;
+    return h1;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
index 102361a733..0bc7845b8b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.segment.local.segment.index.dictionary;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
@@ -77,9 +78,11 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.Intern;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.FALFInterner;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,6 +114,7 @@ public class DictionaryIndexType
       if (noDictionaryCols.contains(column)) {
         result.put(column, DictionaryIndexConfig.disabled());
       } else {
+        // Intern configs can only be used if dictionary is enabled through 
FieldConfigLists.
         result.put(column, new 
DictionaryIndexConfig(onHeapCols.contains(column), 
varLengthCols.contains(column)));
       }
     }
@@ -162,6 +166,8 @@ public class DictionaryIndexType
       Set<String> varLength = new HashSet<>(
           ic.getVarLengthDictionaryColumns() == null ? Collections.emptyList() 
: ic.getVarLengthDictionaryColumns()
       );
+
+      // Intern configs can only be used if dictionary is enabled through 
FieldConfigLists.
       Function<String, DictionaryIndexConfig> valueCalculator =
           column -> new DictionaryIndexConfig(onHeap.contains(column), 
varLength.contains(column));
       return Sets.union(onHeap, varLength).stream()
@@ -281,11 +287,18 @@ public class DictionaryIndexType
 
   public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata 
metadata, DictionaryIndexConfig indexConfig)
       throws IOException {
+    return read(dataBuffer, metadata, indexConfig, null);
+  }
+
+  public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata 
metadata,
+      DictionaryIndexConfig indexConfig, String internIdentifierStr)
+      throws IOException {
+
     FieldSpec.DataType dataType = metadata.getDataType();
     boolean loadOnHeap = indexConfig.isOnHeap();
+    String columnName = metadata.getColumnName();
     if (loadOnHeap) {
-      String columnName = metadata.getColumnName();
-      LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
+      LOGGER.info("Loading on-heap dictionary for column: {}, intern={}", 
columnName, internIdentifierStr != null);
     }
 
     int length = metadata.getCardinality();
@@ -308,7 +321,20 @@ public class DictionaryIndexType
             : new BigDecimalDictionary(dataBuffer, length, numBytesPerValue);
       case STRING:
         numBytesPerValue = metadata.getColumnMaxLength();
-        return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, 
numBytesPerValue)
+
+        // If interning is enabled, get the required interners.
+        FALFInterner<String> strInterner = null;
+        FALFInterner<byte[]> byteInterner = null;
+        Intern internConfig = indexConfig.getIntern();
+        if (internConfig != null && !internConfig.isDisabled()) {
+          Preconditions.checkState(loadOnHeap, "Interning is only supported 
for on-heap dictionaries.");
+          DictionaryInternerHolder internerHolder = 
DictionaryInternerHolder.getInstance();
+          strInterner = internerHolder.getStrInterner(internIdentifierStr, 
internConfig.getCapacity());
+          byteInterner = internerHolder.getByteInterner(internIdentifierStr, 
internConfig.getCapacity());
+          LOGGER.info("Enabling interning for dictionary column: {}", 
columnName);
+        }
+
+        return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, 
numBytesPerValue, strInterner, byteInterner)
             : new StringDictionary(dataBuffer, length, numBytesPerValue);
       case BYTES:
         numBytesPerValue = metadata.getColumnMaxLength();
@@ -352,9 +378,30 @@ public class DictionaryIndexType
     @Override
     protected Dictionary createIndexReader(PinotDataBuffer dataBuffer, 
ColumnMetadata metadata,
         DictionaryIndexConfig indexConfig)
-          throws IOException, IndexReaderConstraintException {
+        throws IOException, IndexReaderConstraintException {
       return DictionaryIndexType.read(dataBuffer, metadata, indexConfig);
     }
+
+    @Override
+    public Dictionary createIndexReader(SegmentDirectory.Reader segmentReader, 
FieldIndexConfigs fieldIndexConfigs,
+        ColumnMetadata metadata) throws IOException, 
IndexReaderConstraintException {
+      String colName = metadata.getColumnName();
+
+      if (!segmentReader.hasIndexFor(colName, StandardIndexes.dictionary())) {
+        return null;
+      }
+
+      PinotDataBuffer buffer = segmentReader.getIndexFor(colName, 
StandardIndexes.dictionary());
+      DictionaryIndexConfig config = 
fieldIndexConfigs.getConfig(StandardIndexes.dictionary());
+      String tableName = 
segmentReader.toSegmentDirectory().getSegmentMetadata().getTableName();
+      String internIdentifierStr = 
DictionaryInternerHolder.getInstance().createIdentifier(tableName, colName);
+
+      try {
+        return DictionaryIndexType.read(buffer, metadata, config, 
internIdentifierStr);
+      } catch (RuntimeException ex) {
+        throw new RuntimeException("Cannot read index " + 
StandardIndexes.dictionary() + " for column " + colName, ex);
+      }
+    }
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
new file mode 100644
index 0000000000..45b49ab752
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.dictionary;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.utils.FALFInterner;
+
+
+/**
+ * This class holds the dictionary interners. It is currently used only for 
OnHeapStringDictionary.
+ */
+public class DictionaryInternerHolder {
+  private static final DictionaryInternerHolder INSTANCE = new 
DictionaryInternerHolder();
+
+  // Map containing tableName + columnName as key and the interner as value. 
The interner is common across all the
+  // segments for a given table.
+  private Map<String, FALFInterner<String>> _strInternerInfoMap;
+  private Map<String, FALFInterner<byte[]>> _byteInternerInfoMap;
+
+  private DictionaryInternerHolder() {
+    _strInternerInfoMap = new ConcurrentHashMap<>();
+    _byteInternerInfoMap = new ConcurrentHashMap<>();
+  }
+
+  public static DictionaryInternerHolder getInstance() {
+    return INSTANCE;
+  }
+
+  public FALFInterner<String> getStrInterner(String columnIdentifier, int 
capacity) {
+    return _strInternerInfoMap.computeIfAbsent(columnIdentifier, k -> new 
FALFInterner<>(capacity));
+  }
+
+  public FALFInterner<byte[]> getByteInterner(String columnIdentifier, int 
capacity) {
+    return _byteInternerInfoMap.computeIfAbsent(columnIdentifier, k -> new 
FALFInterner<>(capacity, Arrays::hashCode));
+  }
+
+  public String createIdentifier(String tableName, String colName) {
+    return tableName + ":" + colName;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
index 13ab350b7f..3a647112c5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
@@ -21,9 +21,11 @@ package org.apache.pinot.segment.local.segment.index.readers;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.FALFInterner;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -42,7 +44,8 @@ public class OnHeapStringDictionary extends 
BaseImmutableDictionary {
   private final byte[][] _unpaddedBytes;
   private final Object2IntOpenHashMap<String> _unPaddedStringToIdMap;
 
-  public OnHeapStringDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue) {
+  public OnHeapStringDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue,
+      @Nullable FALFInterner<String> strInterner, @Nullable 
FALFInterner<byte[]> byteInterner) {
     super(dataBuffer, length, numBytesPerValue);
 
     _unpaddedBytes = new byte[length][];
@@ -51,9 +54,17 @@ public class OnHeapStringDictionary extends 
BaseImmutableDictionary {
     _unPaddedStringToIdMap.defaultReturnValue(Dictionary.NULL_VALUE_INDEX);
 
     byte[] buffer = new byte[numBytesPerValue];
+    boolean enableInterning = strInterner != null && byteInterner != null;
+
     for (int i = 0; i < length; i++) {
-      _unpaddedBytes[i] = getUnpaddedBytes(i, buffer);
-      _unpaddedStrings[i] = new String(_unpaddedBytes[i], UTF_8);
+      if (enableInterning) {
+        _unpaddedBytes[i] = byteInterner.intern(getUnpaddedBytes(i, buffer));
+        _unpaddedStrings[i] = strInterner.intern(new String(_unpaddedBytes[i], 
UTF_8));
+      } else {
+        _unpaddedBytes[i] = getUnpaddedBytes(i, buffer);
+        _unpaddedStrings[i] = new String(_unpaddedBytes[i], UTF_8);
+      }
+
       _unPaddedStringToIdMap.put(_unpaddedStrings[i], i);
     }
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
index 8e3db4f99b..8a4ed209ec 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
@@ -21,12 +21,14 @@ package 
org.apache.pinot.segment.local.segment.index.dictionary;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.Intern;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -38,7 +40,8 @@ public class DictionaryIndexTypeTest {
   public static class ConfTest extends AbstractSerdeIndexContract {
 
     protected void assertEquals(DictionaryIndexConfig expected) {
-      Assert.assertEquals(getActualConfig("dimInt", 
StandardIndexes.dictionary()), expected);
+      DictionaryIndexConfig actualConfig = getActualConfig("dimInt", 
StandardIndexes.dictionary());
+      Assert.assertEquals(actualConfig, expected);
     }
 
     @Test
@@ -132,7 +135,7 @@ public class DictionaryIndexTypeTest {
         throws IOException {
       _tableConfig.getIndexingConfig()
           .setOnHeapDictionaryColumns(JsonUtils.stringToObject("[\"dimInt\"]", 
_stringListTypeRef));
-      assertEquals(new DictionaryIndexConfig(true, null));
+      assertEquals(new DictionaryIndexConfig(true, null, null));
     }
 
     @Test
@@ -140,7 +143,7 @@ public class DictionaryIndexTypeTest {
         throws IOException {
       _tableConfig.getIndexingConfig()
           
.setVarLengthDictionaryColumns(JsonUtils.stringToObject("[\"dimInt\"]", 
_stringListTypeRef));
-      assertEquals(new DictionaryIndexConfig(false, true));
+      assertEquals(new DictionaryIndexConfig(false, true, null));
     }
 
     @Test
@@ -176,7 +179,7 @@ public class DictionaryIndexTypeTest {
           + "      }"
           + "    }\n"
           + " }");
-      assertEquals(new DictionaryIndexConfig(true, true));
+      assertEquals(new DictionaryIndexConfig(true, true, null));
     }
 
     @Test
@@ -191,7 +194,60 @@ public class DictionaryIndexTypeTest {
           + "      }"
           + "    }\n"
           + " }");
-      assertEquals(new DictionaryIndexConfig(true, false));
+      assertEquals(new DictionaryIndexConfig(true, false, null));
+    }
+
+    @Test
+    public void newOnHeapWithInternConfig()
+        throws IOException {
+      addFieldIndexConfig(""
+          + " {\n"
+          + "    \"name\": \"dimInt\","
+          + "    \"indexes\" : {\n"
+          + "      \"dictionary\": {\n"
+          + "        \"onHeap\": true,\n"
+          + "        \"intern\": {\n"
+          + "          \"capacity\":1000\n"
+          + "        }"
+          + "      }"
+          + "    }\n"
+          + " }");
+      assertEquals(new DictionaryIndexConfig(true, false, new Intern(1000)));
+    }
+
+    @Test
+    public void newDisabledOnHeapWithInternConfig()
+        throws IOException {
+      addFieldIndexConfig(""
+          + " {\n"
+          + "    \"name\": \"dimInt\","
+          + "    \"indexes\" : {\n"
+          + "      \"dictionary\": {\n"
+          + "        \"onHeap\": false,\n"
+          + "        \"intern\": {\n"
+          + "          \"capacity\":1000\n"
+          + "        }"
+          + "      }"
+          + "    }\n"
+          + " }");
+      assertThrows(UncheckedIOException.class, () -> getActualConfig("dimInt", 
StandardIndexes.dictionary()));
+    }
+
+    @Test
+    public void newOnHeapWithEmptyConfig()
+        throws IOException {
+      addFieldIndexConfig(""
+          + " {\n"
+          + "    \"name\": \"dimInt\","
+          + "    \"indexes\" : {\n"
+          + "      \"dictionary\": {\n"
+          + "        \"onHeap\": true,\n"
+          + "        \"intern\": {\n"
+          + "        }"
+          + "      }"
+          + "    }\n"
+          + " }");
+      assertThrows(UncheckedIOException.class, () -> getActualConfig("dimInt", 
StandardIndexes.dictionary()));
     }
 
     @Test
@@ -205,7 +261,7 @@ public class DictionaryIndexTypeTest {
           + "      }"
           + "    }\n"
           + " }");
-      assertEquals(new DictionaryIndexConfig(false, false));
+      assertEquals(new DictionaryIndexConfig(false, false, null));
     }
 
     @Test
@@ -220,7 +276,7 @@ public class DictionaryIndexTypeTest {
           + "      }"
           + "    }\n"
           + " }");
-      assertEquals(new DictionaryIndexConfig(false, true));
+      assertEquals(new DictionaryIndexConfig(false, true, null));
     }
 
     @Test
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
index bdab861630..c93768f083 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
@@ -372,7 +372,7 @@ public class ImmutableDictionaryTest {
     try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
         PinotDataBuffer.mapReadOnlyBigEndianFile(
             new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        _numBytesPerStringValue)) {
+        _numBytesPerStringValue, null, null)) {
       testStringDictionary(onHeapStringDictionary);
     }
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
index 303f87b520..78e7b41c1a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.FALFInterner;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -322,7 +323,29 @@ public class ImmutableDictionaryTypeConversionTest {
       throws Exception {
     try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
         PinotDataBuffer.mapReadOnlyBigEndianFile(
-            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH)) {
+            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+        null, null)) {
+      testStringDictionary(onHeapStringDictionary);
+    }
+  }
+
+  @Test
+  public void testOnHeapStringDictionaryWithInterner()
+      throws Exception {
+    FALFInterner<String> strInterner = new FALFInterner<>(128);
+    FALFInterner<byte[]> byteInterner = new FALFInterner<>(128, 
Arrays::hashCode);
+
+    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+        strInterner, byteInterner)) {
+      testStringDictionary(onHeapStringDictionary);
+    }
+
+    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+        strInterner, byteInterner)) {
       testStringDictionary(onHeapStringDictionary);
     }
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
index f93f13c32c..d63979f744 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
@@ -21,29 +21,46 @@ package org.apache.pinot.segment.spi.index;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.Intern;
 
 
 public class DictionaryIndexConfig extends IndexConfig {
 
-  public static final DictionaryIndexConfig DEFAULT = new 
DictionaryIndexConfig(false, false, false);
-  public static final DictionaryIndexConfig DISABLED = new 
DictionaryIndexConfig(true, false, false);
+  public static final DictionaryIndexConfig DEFAULT = new 
DictionaryIndexConfig(false, false, false, Intern.DISABLED);
+  public static final DictionaryIndexConfig DISABLED = new 
DictionaryIndexConfig(true, false, false, Intern.DISABLED);
 
   private final boolean _onHeap;
   private final boolean _useVarLengthDictionary;
+  private final Intern _intern;
 
   public DictionaryIndexConfig(Boolean onHeap, @Nullable Boolean 
useVarLengthDictionary) {
-    this(false, onHeap, useVarLengthDictionary);
+    this(onHeap, useVarLengthDictionary, null);
+  }
+
+  public DictionaryIndexConfig(Boolean onHeap, @Nullable Boolean 
useVarLengthDictionary, Intern intern) {
+    this(false, onHeap, useVarLengthDictionary, intern);
   }
 
   @JsonCreator
   public DictionaryIndexConfig(@JsonProperty("disabled") Boolean disabled, 
@JsonProperty("onHeap") Boolean onHeap,
-      @JsonProperty("useVarLengthDictionary") @Nullable Boolean 
useVarLengthDictionary) {
+      @JsonProperty("useVarLengthDictionary") @Nullable Boolean 
useVarLengthDictionary,
+      @JsonProperty("intern") @Nullable Intern intern) {
     super(disabled);
+
+    if (intern != null) {
+      // Intern configs only work with onHeapDictionary. This precondition can 
be removed when/if we support interning
+      // for off-heap dictionary.
+      Preconditions.checkState(intern.isDisabled() || 
Boolean.TRUE.equals(onHeap),
+          "Intern configs only work with on-heap dictionary");
+    }
+
     _onHeap = onHeap != null && onHeap;
     _useVarLengthDictionary = Boolean.TRUE.equals(useVarLengthDictionary);
+    _intern = intern;
   }
 
   public static DictionaryIndexConfig disabled() {
@@ -58,6 +75,10 @@ public class DictionaryIndexConfig extends IndexConfig {
     return _useVarLengthDictionary;
   }
 
+  public Intern getIntern() {
+    return _intern;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -67,19 +88,21 @@ public class DictionaryIndexConfig extends IndexConfig {
       return false;
     }
     DictionaryIndexConfig that = (DictionaryIndexConfig) o;
-    return _onHeap == that._onHeap && _useVarLengthDictionary == 
that._useVarLengthDictionary;
+    return _onHeap == that._onHeap && _useVarLengthDictionary == 
that._useVarLengthDictionary && Objects.equals(_intern,
+        that._intern);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(_onHeap, _useVarLengthDictionary);
+    return Objects.hash(_onHeap, _useVarLengthDictionary, _intern);
   }
 
   @Override
   public String toString() {
     if (isEnabled()) {
+      String internStr = _intern == null ? "null" : _intern.toString();
       return "DictionaryIndexConfig{" + "\"onHeap\":" + _onHeap + ", 
\"useVarLengthDictionary\":"
-          + _useVarLengthDictionary + "}";
+          + _useVarLengthDictionary + ", \"intern\":" + internStr + "}";
     } else {
       return "DictionaryIndexConfig{" + "\"disabled\": true}";
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
new file mode 100644
index 0000000000..e5fbf9ec43
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+
+
+/**
+ * Class that holds the configurations regarding interning.
+ */
+public class Intern {
+  public static final Intern DISABLED = new Intern(true, 0);
+
+  private boolean _disabled;
+  private int _capacity;
+
+  public Intern(int capacity) {
+    this(false, capacity);
+  }
+
+  @JsonCreator
+  public Intern(@JsonProperty("disabled") boolean disabled, 
@JsonProperty("capacity") int capacity) {
+    Preconditions.checkState(capacity > 0 || disabled, "Invalid interner 
capacity: " + capacity);
+    Preconditions.checkState(capacity == 0 || !disabled, "Enable interning to 
use capacity > 0");
+
+    _disabled = disabled;
+    _capacity = capacity;
+  }
+
+  public boolean isDisabled() {
+    return _disabled;
+  }
+
+  public int getCapacity() {
+    return _capacity;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Intern that = (Intern) o;
+    return _disabled == that._disabled && _capacity == that._capacity;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), _disabled, _capacity);
+  }
+
+  @Override
+  public String toString() {
+    return "\"disabled\":" + _disabled + ", \"capacity\":" + _capacity;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
new file mode 100644
index 0000000000..89a3355756
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.utils;
+
+
+import com.google.common.collect.Interner;
+import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.util.function.ToIntFunction;
+
+/**
+ * Fixed-size Array-based, Lock-Free Interner.
+ *
+ * !!!!!!!!!!!!!!! READ THE PARAGRAPH BELOW BEFORE USING THIS CLASS 
!!!!!!!!!!!!!!!!
+ * This class is technically not thread-safe. Therefore if it's called from 
multiple
+ * threads, it should either be used with proper synchronization (in the same 
way as
+ * you would use e.g. a HashMap), or under the following conditions:
+ * all the objects being interned are not just immutable, but also final (that 
is, all
+ * their fields used in equals() and hashCode() methods are explicitly marked 
final).
+ * That's to ensure that all threads always see the same contents of these 
objects. If
+ * this rule is not followed, using this class from multiple threads may lead 
to strange
+ * non-deterministic errors. Note that objects with all private fields that 
are not
+ * marked final, or immutable collections created via 
Collection.unmodifiableMap() etc,
+ * don't qualify.
+ * 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ *
+ * This interner is intended to be used when either:
+ * (a) distribution of values among the objects to be interned make them not 
suitable
+ *     for standard interners
+ * (b) speed is more important than ultimate memory savings
+ *
+ * Problem (a) occurs when both the total number of objects AND the number of 
unique
+ * values is large. For example, there are 1M strings that look like "a", "a", 
"b", "b",
+ * "c", "c", ... - that is, for each unique value there are only two separate 
objects.
+ * Another problematic case is when "a" has 1000 copies, "b" has 900 copies, 
etc.,
+ * but in the last few hundred thousand objects each one is unique. In both 
cases, if
+ * we use a standard interner such as a Guava interner or a ConcurrentHashMap 
to
+ * deduplicate such objects, the amount of memory consumed by the interner 
itself to
+ * store objects that have few or no duplicates, can be comparable, or even 
exceed, the
+ * savings achieved by getting rid of duplicate objects.
+ *
+ * This implementation addresses the above problems by interning objects 
"optimistically".
+ * It is a fixed-size, open-hashmap-based object cache. When there is a cache 
miss,
+ * a cached object in the given slot is always replaced with a new object. 
There is
+ * no locking and no synchronization, and thus, no associated overhead. In 
essence,
+ * this cache is based on the idea that an object with value X, that has many 
copies,
+ * has a higher chance of staying in the cache for long enough to guarantee 
several
+ * cache hits for itself before a miss evicts it and replaces it with an 
object with
+ * a different value Y.
+ *
+ * This interner has a minimum possible memory footprint. You should be 
careful when
+ * choosing its capacity. In general, the bigger the better, but if some of 
the objects
+ * that are interned eventually go away, an interner with too big a capacity 
may still
+ * keep these objects in memory. Also, since there are no collision chains, it 
is
+ * very important to use a hash function with the most uniform distribution, 
to minimize
+ * a chance that two or more objects with many duplicates compete for the same 
array slot.
+ *
+ * For more information, see 
https://dzone.com/articles/duplicate-objects-in-java-not-just-strings
+ * Credits to the author: Misha Dmitriev
+ */
+public class FALFInterner<T> implements Interner<T> {
+  private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+  private final Object[] _cache;
+  private final int _cacheLengthMinusOne;
+  private final BiPredicate<T, T> _equalsFunction;
+  private final ToIntFunction<T> _hashFunction;
+
+  /**
+   * Constructs a new instance with the specified capacity.
+   * Actual capacity will be a power of two number >= expectedCapacity.
+   */
+  public FALFInterner(int expectedCapacity) {
+    this(expectedCapacity, Objects::hashCode);
+  }
+
+  /**
+   * Constructs a new instance with the specified capacity and a custom hash 
function.
+   * Actual capacity will be a power of two number >= expectedCapacity.
+   */
+  public FALFInterner(int expectedCapacity, ToIntFunction<T> hashFunction) {
+    this(expectedCapacity, hashFunction, Objects::equals);
+  }
+
+  /**
+   * Constructs a new instance with the specified capacity and custom equals 
and hash functions.
+   * Actual capacity will be a power of two number >= expectedCapacity.
+   */
+  public FALFInterner(int expectedCapacity, ToIntFunction<T> hashFunction, 
BiPredicate<T, T> equalsFunction) {
+    _cache = new Object[tableSizeFor(expectedCapacity)];
+    _cacheLengthMinusOne = _cache.length - 1;
+    _equalsFunction = Objects.requireNonNull(equalsFunction);
+    _hashFunction = Objects.requireNonNull(hashFunction);
+  }
+
+  /**
+   * IMPORTANT: OBJECTS TO INTERN SHOULD BE IMMUTABLE AND FINAL!
+   * SEE THE JAVADOC OF THIS CLASS FOR MORE INFORMATION.
+   *
+   * Interns the given object. That is, if a cached object obj1 such that
+   * obj1.equals(obj) is available, returns obj1. Otherwise, caches obj and
+   * returns it. None of the cached objects is guaranteed to survive in the
+   * cache.
+   */
+  @Override
+  public T intern(T obj) {
+    int slot = hash(obj) & _cacheLengthMinusOne;
+    T cachedObj = (T) _cache[slot];
+    if (cachedObj != null && _equalsFunction.test(obj, cachedObj)) {
+      return cachedObj;
+    }
+    _cache[slot] = obj;
+    return obj;
+  }
+
+  private int hash(T key) {
+    int h = _hashFunction.applyAsInt(key);
+    return h ^ (h >>> 16);
+  }
+
+  private static int tableSizeFor(int cap) {
+    // Calculated in the same way as in java.util.HashMap
+    int n = cap - 1;
+    n |= n >>> 1;
+    n |= n >>> 2;
+    n |= n >>> 4;
+    n |= n >>> 8;
+    n |= n >>> 16;
+    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
+  }
+}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
index 5339d914b2..f97e9b30b1 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
@@ -46,6 +46,9 @@ public class IndexingConfigTest {
     indexingConfig.setOnHeapDictionaryColumns(onHeapDictionaryColumns);
     List<String> bloomFilterColumns = Arrays.asList("a", "b");
     indexingConfig.setBloomFilterColumns(bloomFilterColumns);
+    Map<String, BloomFilterConfig> bloomFilterConfigs = new HashMap<>();
+    bloomFilterConfigs.put("a", new BloomFilterConfig(0.123, 456, true));
+    indexingConfig.setBloomFilterConfigs(bloomFilterConfigs);
     Map<String, String> noDictionaryConfig = new HashMap<>();
     noDictionaryConfig.put("a", "SNAPPY");
     noDictionaryConfig.put("b", "PASS_THROUGH");
@@ -54,7 +57,8 @@ public class IndexingConfigTest {
     indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
     indexingConfig.setSegmentNameGeneratorType("normalizedDate");
 
-    indexingConfig = 
JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig), 
IndexingConfig.class);
+    String indexingConfigStr = JsonUtils.objectToString(indexingConfig);
+    indexingConfig = JsonUtils.stringToObject(indexingConfigStr, 
IndexingConfig.class);
 
     assertEquals(indexingConfig.getLoadMode(), "MMAP");
     assertTrue(indexingConfig.isAggregateMetrics());
@@ -62,6 +66,7 @@ public class IndexingConfigTest {
     assertEquals(indexingConfig.getSortedColumn(), sortedColumn);
     assertEquals(indexingConfig.getOnHeapDictionaryColumns(), 
onHeapDictionaryColumns);
     assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
+    assertEquals(indexingConfig.getBloomFilterConfigs(), bloomFilterConfigs);
     assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
     assertEquals(indexingConfig.getVarLengthDictionaryColumns(), 
varLengthDictionaryColumns);
     assertEquals(indexingConfig.getSegmentNameGeneratorType(), 
"normalizedDate");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to