This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2be1ae1  RowBasedIndexedTable: Add specialized index types for long 
keys. (#10430)
2be1ae1 is described below

commit 2be1ae128ff03b881c3dce04f3e7abdd8e4fe227
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 29 10:46:47 2020 -0700

    RowBasedIndexedTable: Add specialized index types for long keys. (#10430)
    
    * RowBasedIndexedTable: Add specialized index types for long keys.
    
    Two new index types are added:
    
    1) Use an int-array-based index in cases where the difference between
       the min and max values isn't too large, and keys are unique.
    
    2) Use a Long2ObjectOpenHashMap (instead of the prior Java HashMap) in
       all other cases.
    
    In addition:
    
    1) RowBasedIndexBuilder, a new class, is responsible for picking which
       index implementation to use.
    
    2) The IndexedTable.Index interface is extended to support using
       unboxed primitives in the unique-long-keys case, and callers are
       updated to use the new functionality.
    
    Other key types continue to use indexes backed by Java HashMaps.
    
    * Fixup logic.
    
    * Add tests.
---
 .../join/table/BroadcastSegmentIndexedTable.java   |  39 +++--
 .../druid/segment/join/table/IndexedTable.java     |  30 +++-
 .../join/table/IndexedTableJoinMatcher.java        | 183 ++++++++++++++++-----
 .../apache/druid/segment/join/table/MapIndex.java  |  98 +++++++++++
 .../segment/join/table/RowBasedIndexBuilder.java   | 158 ++++++++++++++++++
 .../segment/join/table/RowBasedIndexedTable.java   |  54 ++----
 .../segment/join/table/UniqueLongArrayIndex.java   |  87 ++++++++++
 .../join/table/IndexedTableJoinMatcherTest.java    | 161 +++++++++++-------
 .../join/table/RowBasedIndexBuilderTest.java       | 182 ++++++++++++++++++++
 9 files changed, 831 insertions(+), 161 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
index 113bf3d..6481d89 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
@@ -20,8 +20,6 @@
 package org.apache.druid.segment.join.table;
 
 import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -52,7 +50,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -67,9 +64,13 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
   private final Set<String> keyColumns;
   private final RowSignature rowSignature;
   private final String version;
-  private final List<Map<Object, IntList>> keyColumnsIndex;
+  private final List<Index> keyColumnsIndexes;
 
-  public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, 
final Set<String> keyColumns, final String version)
+  public BroadcastSegmentIndexedTable(
+      final QueryableIndexSegment theSegment,
+      final Set<String> keyColumns,
+      final String version
+  )
   {
     this.keyColumns = keyColumns;
     this.version = version;
@@ -92,19 +93,22 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
     }
     this.rowSignature = sigBuilder.build();
 
-    // initialize keycolumn index maps
-    this.keyColumnsIndex = new ArrayList<>(rowSignature.size());
+    // initialize keycolumn index builders
+    final ArrayList<RowBasedIndexBuilder> indexBuilders = new 
ArrayList<>(rowSignature.size());
     final List<String> keyColumnNames = new ArrayList<>(keyColumns.size());
     for (int i = 0; i < rowSignature.size(); i++) {
-      final Map<Object, IntList> m;
+      final RowBasedIndexBuilder m;
       final String columnName = rowSignature.getColumnName(i);
       if (keyColumns.contains(columnName)) {
-        m = new HashMap<>();
+        final ValueType keyType =
+            
rowSignature.getColumnType(i).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
+
+        m = new RowBasedIndexBuilder(keyType);
         keyColumnNames.add(columnName);
       } else {
         m = null;
       }
-      keyColumnsIndex.add(m);
+      indexBuilders.add(m);
     }
 
     // sort of like the dump segment tool, but build key column indexes when 
reading the segment
@@ -143,12 +147,8 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
             for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < 
selectors.size(); keyColumnSelectorIndex++) {
               final String keyColumnName = 
keyColumnNames.get(keyColumnSelectorIndex);
               final int columnPosition = rowSignature.indexOf(keyColumnName);
-              final Map<Object, IntList> keyColumnValueIndex = 
keyColumnsIndex.get(columnPosition);
-              final Object key = 
selectors.get(keyColumnSelectorIndex).getObject();
-              if (key != null) {
-                final IntList array = keyColumnValueIndex.computeIfAbsent(key, 
k -> new IntArrayList());
-                array.add(rowNumber);
-              }
+              final RowBasedIndexBuilder keyColumnIndexBuilder = 
indexBuilders.get(columnPosition);
+              
keyColumnIndexBuilder.add(selectors.get(keyColumnSelectorIndex).getObject());
             }
 
             if (rowNumber % 100_000 == 0) {
@@ -166,6 +166,11 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
     );
 
     Integer totalRows = sequence.accumulate(0, (accumulated, in) -> 
accumulated += in);
+
+    this.keyColumnsIndexes = indexBuilders.stream()
+                                          .map(builder -> builder != null ? 
builder.build() : null)
+                                          .collect(Collectors.toList());
+
     LOG.info("Created BroadcastSegmentIndexedTable with %s rows.", totalRows);
   }
 
@@ -196,7 +201,7 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
   @Override
   public Index columnIndex(int column)
   {
-    return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndex, 
rowSignature);
+    return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndexes);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
index 62c7677..b1e4d67 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java
@@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ReferenceCountedObject;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ReadableOffset;
 
 import javax.annotation.Nullable;
@@ -92,10 +93,37 @@ public interface IndexedTable extends 
ReferenceCountedObject, Closeable
    */
   interface Index
   {
+    int NOT_FOUND = -1;
+
+    /**
+     * Returns the natural key type for the index.
+     */
+    ValueType keyType();
+
+    /**
+     * Returns whether keys are unique in this index. If this returns true, 
then {@link #find(Object)} will only ever
+     * return a zero- or one-element list.
+     */
+    boolean areKeysUnique();
+
     /**
-     * Returns the list of row numbers where the column this Reader is based 
on contains 'key'.
+     * Returns the list of row numbers corresponding to "key" in this index.
+     *
+     * If "key" is some type other than the natural type {@link #keyType()}, 
it will be converted before checking
+     * the index.
      */
     IntList find(Object key);
+
+    /**
+     * Returns the row number corresponding to "key" in this index, or {@link 
#NOT_FOUND} if the key does not exist
+     * in the index.
+     *
+     * It is only valid to call this method if {@link #keyType()} is {@link 
ValueType#LONG} and {@link #areKeysUnique()}
+     * returns true.
+     *
+     * @throws UnsupportedOperationException if preconditions are not met
+     */
+    int findUniqueLong(long key);
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java
index 2b6034c..3b1bccf 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcher.java
@@ -30,6 +30,7 @@ import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.QueryUnsupportedException;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
@@ -59,18 +60,19 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.function.Function;
 import java.util.function.IntFunction;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class IndexedTableJoinMatcher implements JoinMatcher
 {
+  static final int NO_CONDITION_MATCH = -1;
   private static final int UNINITIALIZED_CURRENT_ROW = -1;
 
   // Key column type to use when the actual key type is unknown.
   static final ValueType DEFAULT_KEY_TYPE = ValueType.STRING;
 
   private final IndexedTable table;
-  private final List<Supplier<IntIterator>> conditionMatchers;
+  private final List<ConditionMatcher> conditionMatchers;
+  private final boolean singleRowMatching;
   private final IntIterator[] currentMatchedRows;
   private final ColumnSelectorFactory selectorFactory;
 
@@ -103,13 +105,23 @@ public class IndexedTableJoinMatcher implements 
JoinMatcher
 
     if (condition.isAlwaysTrue()) {
       this.conditionMatchers = Collections.singletonList(() -> 
IntIterators.fromTo(0, table.numRows()));
+      this.singleRowMatching = false;
     } else if (condition.isAlwaysFalse()) {
       this.conditionMatchers = Collections.singletonList(() -> 
IntIterators.EMPTY_ITERATOR);
+      this.singleRowMatching = false;
     } else if (condition.getNonEquiConditions().isEmpty()) {
-      this.conditionMatchers = condition.getEquiConditions()
-                                        .stream()
-                                        .map(eq -> makeConditionMatcher(table, 
leftSelectorFactory, eq))
-                                        
.collect(Collectors.toCollection(ArrayList::new));
+      final List<Pair<IndexedTable.Index, Equality>> indexes =
+          condition.getEquiConditions()
+                   .stream()
+                   .map(eq -> Pair.of(getIndex(table, eq), eq))
+                   .collect(Collectors.toCollection(ArrayList::new));
+
+      this.conditionMatchers =
+          indexes.stream()
+                 .map(pair -> makeConditionMatcher(pair.lhs, 
leftSelectorFactory, pair.rhs))
+                 .collect(Collectors.toList());
+
+      this.singleRowMatching = indexes.stream().allMatch(pair -> 
pair.lhs.areKeysUnique());
     } else {
       throw new IAE(
           "Cannot build hash-join matcher on non-equi-join condition: %s",
@@ -117,7 +129,13 @@ public class IndexedTableJoinMatcher implements JoinMatcher
       );
     }
 
-    this.currentMatchedRows = new IntIterator[conditionMatchers.size()];
+    if (!singleRowMatching) {
+      // Only used by "matchCondition", and only in the multi-row-matching 
case.
+      this.currentMatchedRows = new IntIterator[conditionMatchers.size()];
+    } else {
+      this.currentMatchedRows = null;
+    }
+
     ColumnSelectorFactory selectorFactory = 
table.makeColumnSelectorFactory(joinableOffset, descending, closer);
     this.selectorFactory = selectorFactory != null
                            ? selectorFactory
@@ -131,9 +149,8 @@ public class IndexedTableJoinMatcher implements JoinMatcher
 
   }
 
-  private static Supplier<IntIterator> makeConditionMatcher(
+  private static IndexedTable.Index getIndex(
       final IndexedTable table,
-      final ColumnSelectorFactory selectorFactory,
       final Equality condition
   )
   {
@@ -143,15 +160,19 @@ public class IndexedTableJoinMatcher implements 
JoinMatcher
 
     final int keyColumnNumber = 
table.rowSignature().indexOf(condition.getRightColumn());
 
-    final ValueType keyType =
-        
table.rowSignature().getColumnType(condition.getRightColumn()).orElse(DEFAULT_KEY_TYPE);
-
-    final IndexedTable.Index index = table.columnIndex(keyColumnNumber);
+    return table.columnIndex(keyColumnNumber);
+  }
 
+  private static ConditionMatcher makeConditionMatcher(
+      final IndexedTable.Index index,
+      final ColumnSelectorFactory selectorFactory,
+      final Equality condition
+  )
+  {
     return ColumnProcessors.makeProcessor(
         condition.getLeftExpr(),
-        keyType,
-        new ConditionMatcherFactory(keyType, index),
+        index.keyType(),
+        new ConditionMatcherFactory(index),
         selectorFactory
     );
   }
@@ -167,22 +188,39 @@ public class IndexedTableJoinMatcher implements 
JoinMatcher
   {
     reset();
 
-    for (int i = 0; i < conditionMatchers.size(); i++) {
-      final IntIterator rows = conditionMatchers.get(i).get();
-      if (rows.hasNext()) {
-        currentMatchedRows[i] = rows;
+    if (singleRowMatching) {
+      if (conditionMatchers.size() == 1) {
+        currentRow = conditionMatchers.get(0).matchSingleRow();
       } else {
-        return;
-      }
-    }
+        currentRow = conditionMatchers.get(0).matchSingleRow();
 
-    if (currentMatchedRows.length == 1) {
-      currentIterator = currentMatchedRows[0];
+        for (int i = 1; i < conditionMatchers.size(); i++) {
+          if (currentRow != conditionMatchers.get(i).matchSingleRow()) {
+            currentRow = UNINITIALIZED_CURRENT_ROW;
+            break;
+          }
+        }
+      }
     } else {
-      currentIterator = new SortedIntIntersectionIterator(currentMatchedRows);
+      if (conditionMatchers.size() == 1) {
+        currentIterator = conditionMatchers.get(0).match();
+      } else {
+        for (int i = 0; i < conditionMatchers.size(); i++) {
+          final IntIterator rows = conditionMatchers.get(i).match();
+          if (rows.hasNext()) {
+            currentMatchedRows[i] = rows;
+          } else {
+            return;
+          }
+        }
+
+        currentIterator = new 
SortedIntIntersectionIterator(currentMatchedRows);
+      }
+
+      advanceCurrentRow();
     }
 
-    nextMatch();
+    addCurrentRowToMatchedRows();
   }
 
   @Override
@@ -225,7 +263,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
     };
 
     matchingRemainder = true;
-    nextMatch();
+    advanceCurrentRow();
   }
 
   @Override
@@ -244,10 +282,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
   public void nextMatch()
   {
     advanceCurrentRow();
-
-    if (!matchingRemainder && matchedRows != null && hasMatch()) {
-      matchedRows.add(currentRow);
-    }
+    addCurrentRowToMatchedRows();
   }
 
   @Override
@@ -272,11 +307,36 @@ public class IndexedTableJoinMatcher implements 
JoinMatcher
     }
   }
 
+  private void addCurrentRowToMatchedRows()
+  {
+    if (!matchingRemainder && matchedRows != null && hasMatch()) {
+      matchedRows.add(currentRow);
+    }
+  }
+
+  interface ConditionMatcher
+  {
+    /**
+     * Returns the first row that matches the current cursor position, or 
{@link #NO_CONDITION_MATCH} if nothing
+     * matches.
+     */
+    default int matchSingleRow()
+    {
+      final IntIterator it = match();
+      return it.hasNext() ? it.nextInt() : NO_CONDITION_MATCH;
+    }
+
+    /**
+     * Returns an iterator for the row numbers that match the current cursor 
position.
+     */
+    IntIterator match();
+  }
+
   /**
    * Makes suppliers that returns the list of IndexedTable rows that match the 
values from selectors.
    */
   @VisibleForTesting
-  static class ConditionMatcherFactory implements 
ColumnProcessorFactory<Supplier<IntIterator>>
+  static class ConditionMatcherFactory implements 
ColumnProcessorFactory<ConditionMatcher>
   {
     @VisibleForTesting
     static final int CACHE_MAX_SIZE = 1000;
@@ -290,9 +350,9 @@ public class IndexedTableJoinMatcher implements JoinMatcher
     @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")  // updated via 
computeIfAbsent
     private final LruLoadingHashMap<DimensionSelector, Int2IntListMap> 
dimensionCaches;
 
-    ConditionMatcherFactory(ValueType keyType, IndexedTable.Index index)
+    ConditionMatcherFactory(IndexedTable.Index index)
     {
-      this.keyType = keyType;
+      this.keyType = index.keyType();
       this.index = index;
 
       this.dimensionCaches = new LruLoadingHashMap<>(
@@ -325,7 +385,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
     }
 
     @Override
-    public Supplier<IntIterator> makeDimensionProcessor(DimensionSelector 
selector, boolean multiValue)
+    public ConditionMatcher makeDimensionProcessor(DimensionSelector selector, 
boolean multiValue)
     {
       // NOTE: The slow (cardinality unknown) and fast (cardinality known) 
code paths below only differ in the calls to
       // getRowNumbers() and getAndCacheRowNumbers(), respectively. The 
majority of the code path is duplicated to avoid
@@ -374,7 +434,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
     }
 
     @Override
-    public Supplier<IntIterator> 
makeFloatProcessor(BaseFloatColumnValueSelector selector)
+    public ConditionMatcher makeFloatProcessor(BaseFloatColumnValueSelector 
selector)
     {
       if (NullHandling.replaceWithDefault()) {
         return () -> index.find(selector.getFloat()).iterator();
@@ -384,7 +444,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
     }
 
     @Override
-    public Supplier<IntIterator> 
makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
+    public ConditionMatcher makeDoubleProcessor(BaseDoubleColumnValueSelector 
selector)
     {
       if (NullHandling.replaceWithDefault()) {
         return () -> index.find(selector.getDouble()).iterator();
@@ -394,19 +454,58 @@ public class IndexedTableJoinMatcher implements 
JoinMatcher
     }
 
     @Override
-    public Supplier<IntIterator> makeLongProcessor(BaseLongColumnValueSelector 
selector)
+    public ConditionMatcher makeLongProcessor(BaseLongColumnValueSelector 
selector)
     {
       if (NullHandling.replaceWithDefault()) {
-        return () -> index.find(selector.getLong()).iterator();
+        return new ConditionMatcher()
+        {
+          @Override
+          public int matchSingleRow()
+          {
+            return index.findUniqueLong(selector.getLong());
+          }
+
+          @Override
+          public IntIterator match()
+          {
+            return index.find(selector.getLong()).iterator();
+          }
+        };
       } else {
-        return () -> selector.isNull() ? IntIterators.EMPTY_ITERATOR : 
index.find(selector.getLong()).iterator();
+        return new ConditionMatcher()
+        {
+          @Override
+          public int matchSingleRow()
+          {
+            return selector.isNull() ? NO_CONDITION_MATCH : 
index.findUniqueLong(selector.getLong());
+          }
+
+          @Override
+          public IntIterator match()
+          {
+            return selector.isNull() ? IntIterators.EMPTY_ITERATOR : 
index.find(selector.getLong()).iterator();
+          }
+        };
       }
     }
 
     @Override
-    public Supplier<IntIterator> 
makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
+    public ConditionMatcher 
makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
     {
-      return () -> IntIterators.EMPTY_ITERATOR;
+      return new ConditionMatcher()
+      {
+        @Override
+        public int matchSingleRow()
+        {
+          return NO_CONDITION_MATCH;
+        }
+
+        @Override
+        public IntIterator match()
+        {
+          return IntIterators.EMPTY_ITERATOR;
+        }
+      };
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/MapIndex.java 
b/processing/src/main/java/org/apache/druid/segment/join/table/MapIndex.java
new file mode 100644
index 0000000..df93f1b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/join/table/MapIndex.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntLists;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ValueType;
+
+import java.util.Map;
+
+/**
+ * An {@link IndexedTable.Index} backed by a Map.
+ */
+public class MapIndex implements IndexedTable.Index
+{
+  private final ValueType keyType;
+  private final Map<Object, IntList> index;
+  private final boolean keysUnique;
+  private final boolean isLong2ObjectMap;
+
+  /**
+   * Creates a new instance based on a particular map.
+   *
+   * @param keyType    type of keys in "index"
+   * @param index      a map of keys to matching row numbers
+   * @param keysUnique whether the keys are unique (if true: all IntLists in 
the index must be exactly 1 element)
+   *
+   * @see RowBasedIndexBuilder#build() the main caller
+   */
+  MapIndex(final ValueType keyType, final Map<Object, IntList> index, final 
boolean keysUnique)
+  {
+    this.keyType = Preconditions.checkNotNull(keyType, "keyType");
+    this.index = Preconditions.checkNotNull(index, "index");
+    this.keysUnique = keysUnique;
+    this.isLong2ObjectMap = index instanceof Long2ObjectMap;
+  }
+
+  @Override
+  public ValueType keyType()
+  {
+    return keyType;
+  }
+
+  @Override
+  public boolean areKeysUnique()
+  {
+    return keysUnique;
+  }
+
+  @Override
+  public IntList find(Object key)
+  {
+    final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, 
keyType, false);
+
+    if (convertedKey != null) {
+      final IntList found = index.get(convertedKey);
+      if (found != null) {
+        return found;
+      } else {
+        return IntLists.EMPTY_LIST;
+      }
+    } else {
+      return IntLists.EMPTY_LIST;
+    }
+  }
+
+  @Override
+  public int findUniqueLong(long key)
+  {
+    if (isLong2ObjectMap && keysUnique) {
+      final IntList rows = ((Long2ObjectMap<IntList>) (Map) index).get(key);
+      assert rows == null || rows.size() == 1;
+      return rows != null ? rows.getInt(0) : NOT_FOUND;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexBuilder.java
new file mode 100644
index 0000000..605edb9
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexBuilder.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for creating {@link IndexedTable.Index} instances.
+ *
+ * Its main role is to decide which kind of implementation to use.
+ */
+public class RowBasedIndexBuilder
+{
+  // Long2ObjectOpenHashMap<IntList> is (very) roughly 15x bigger than int[] 
per entry.
+  private static final long INT_ARRAY_SPACE_SAVINGS_FACTOR = 15;
+
+  // A number that is small enough that we shouldn't worry about making a full 
array for it. (Yields a 1MB array.)
+  private static final long INT_ARRAY_SMALL_SIZE_OK = 250_000;
+
+  private int currentRow = 0;
+  private int nullKeys = 0;
+  private final ValueType keyType;
+  private final Map<Object, IntList> index;
+
+  private long minLongKey = Long.MAX_VALUE;
+  private long maxLongKey = Long.MIN_VALUE;
+
+  public RowBasedIndexBuilder(ValueType keyType)
+  {
+    this.keyType = keyType;
+
+    if (keyType == ValueType.LONG) {
+      // We're specializing the type even though we don't specialize usage in 
this class, for two reasons:
+      //  (1) It's still useful to reduce overall memory footprint.
+      //  (2) MapIndex specifically checks for Long2ObjectMap instances and 
*does* specialize usage.
+      final Long2ObjectOpenHashMap<IntList> theMap = new 
Long2ObjectOpenHashMap<>();
+      index = (Map) theMap;
+    } else {
+      index = new HashMap<>();
+    }
+  }
+
+  /**
+   * Add a key to the index. This must be called exactly once per row, even 
for null values or values that are the
+   * wrong type, because the builder keeps an internal row-number counter. The 
builder will handle both nulls and
+   * mismatched types, so callers do not need to worry about this.
+   */
+  public RowBasedIndexBuilder add(@Nullable final Object key)
+  {
+    final Object castKey = DimensionHandlerUtils.convertObjectToType(key, 
keyType);
+
+    if (castKey != null) {
+      final IntList rowNums = index.computeIfAbsent(castKey, k -> new 
IntArrayList());
+      rowNums.add(currentRow);
+
+      // Track min, max long value so we can decide later on if it's 
appropriate to use an array-backed implementation.
+      if (keyType == ValueType.LONG && (long) castKey < minLongKey) {
+        minLongKey = (long) castKey;
+      }
+
+      if (keyType == ValueType.LONG && (long) castKey > maxLongKey) {
+        maxLongKey = (long) castKey;
+      }
+    } else {
+      nullKeys++;
+    }
+
+    currentRow++;
+
+    return this;
+  }
+
+  /**
+   * Create the index. After calling this, the state of the builder is 
undefined, and you should discard it.
+   */
+  public IndexedTable.Index build()
+  {
+    final boolean keysUnique = index.size() == currentRow - nullKeys;
+
+    if (keyType == ValueType.LONG && keysUnique && index.size() > 0) {
+      // May be a good candidate for UniqueLongArrayIndex. Check the range of 
values as compared to min and max.
+      long range;
+
+      try {
+        // Add 1 so "range" would be equal to the size of the necessary array.
+        range = Math.addExact(Math.subtractExact(maxLongKey, minLongKey), 1);
+      }
+      catch (ArithmeticException e) {
+        // Overflow; way too big.
+        range = 0;
+      }
+
+      // Use a UniqueLongArrayIndex if the range of values is small enough.
+      final long rangeThreshold = Math.max(
+          INT_ARRAY_SMALL_SIZE_OK,
+          Math.min(Integer.MAX_VALUE, INT_ARRAY_SPACE_SAVINGS_FACTOR * 
index.size())
+      );
+
+      if (range > 0 && range < rangeThreshold) {
+        final int[] indexAsArray = new int[Ints.checkedCast(range)];
+        Arrays.fill(indexAsArray, IndexedTable.Index.NOT_FOUND);
+
+        // Safe to cast to Long2ObjectMap because the constructor always uses 
one for long-typed keys.
+        final ObjectIterator<Long2ObjectMap.Entry<IntList>> entries =
+            ((Long2ObjectMap<IntList>) ((Map) 
index)).long2ObjectEntrySet().iterator();
+
+        while (entries.hasNext()) {
+          final Long2ObjectMap.Entry<IntList> entry = entries.next();
+          final IntList rowNums = entry.getValue();
+
+          if (rowNums.size() != 1) {
+            throw new ISE("Expected single element");
+          }
+
+          indexAsArray[Ints.checkedCast(entry.getLongKey() - minLongKey)] = 
rowNums.getInt(0);
+          entries.remove();
+        }
+
+        assert index.isEmpty();
+
+        // Early return of specialized implementation.
+        return new UniqueLongArrayIndex(indexAsArray, minLongKey);
+      }
+    }
+
+    return new MapIndex(keyType, index, keysUnique);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
index c570017..15600ea 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java
@@ -20,21 +20,15 @@
 package org.apache.druid.segment.join.table;
 
 import com.google.common.collect.ImmutableSet;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
-import it.unimi.dsi.fastutil.ints.IntLists;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.RowAdapter;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 
 import java.io.Closeable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
@@ -47,7 +41,7 @@ import java.util.stream.Collectors;
 public class RowBasedIndexedTable<RowType> implements IndexedTable
 {
   private final List<RowType> table;
-  private final List<Map<Object, IntList>> index;
+  private final List<Index> indexes;
   private final RowSignature rowSignature;
   private final List<Function<RowType, Object>> columnFunctions;
   private final Set<String> keyColumns;
@@ -76,33 +70,29 @@ public class RowBasedIndexedTable<RowType> implements 
IndexedTable
       );
     }
 
-    index = new ArrayList<>(rowSignature.size());
+    indexes = new ArrayList<>(rowSignature.size());
 
     for (int i = 0; i < rowSignature.size(); i++) {
       final String column = rowSignature.getColumnName(i);
-      final Map<Object, IntList> m;
+      final Index m;
 
       if (keyColumns.contains(column)) {
         final ValueType keyType =
             
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
 
+        final RowBasedIndexBuilder builder = new RowBasedIndexBuilder(keyType);
         final Function<RowType, Object> columnFunction = 
columnFunctions.get(i);
 
-        m = new HashMap<>();
-
-        for (int j = 0; j < table.size(); j++) {
-          final RowType row = table.get(j);
-          final Object key = 
DimensionHandlerUtils.convertObjectToType(columnFunction.apply(row), keyType);
-          if (key != null) {
-            final IntList array = m.computeIfAbsent(key, k -> new 
IntArrayList());
-            array.add(j);
-          }
+        for (final RowType row : table) {
+          builder.add(columnFunction.apply(row));
         }
+
+        m = builder.build();
       } else {
         m = null;
       }
 
-      index.add(m);
+      indexes.add(m);
     }
   }
 
@@ -127,7 +117,7 @@ public class RowBasedIndexedTable<RowType> implements 
IndexedTable
   @Override
   public Index columnIndex(int column)
   {
-    return getKeyColumnIndex(column, index, rowSignature);
+    return getKeyColumnIndex(column, indexes);
   }
 
   @Override
@@ -161,30 +151,14 @@ public class RowBasedIndexedTable<RowType> implements 
IndexedTable
     // nothing to close
   }
 
-  static Index getKeyColumnIndex(int column, List<Map<Object, IntList>> 
keyColumnsIndex, RowSignature rowSignature)
+  static Index getKeyColumnIndex(int column, List<Index> indexes)
   {
-    final Map<Object, IntList> indexMap = keyColumnsIndex.get(column);
+    final Index index = indexes.get(column);
 
-    if (indexMap == null) {
+    if (index == null) {
       throw new IAE("Column[%d] is not a key column", column);
     }
 
-    final ValueType columnType =
-        
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
-
-    return key -> {
-      final Object convertedKey = 
DimensionHandlerUtils.convertObjectToType(key, columnType, false);
-
-      if (convertedKey != null) {
-        final IntList found = indexMap.get(convertedKey);
-        if (found != null) {
-          return found;
-        } else {
-          return IntLists.EMPTY_LIST;
-        }
-      } else {
-        return IntLists.EMPTY_LIST;
-      }
-    };
+    return index;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/UniqueLongArrayIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/UniqueLongArrayIndex.java
new file mode 100644
index 0000000..5339b6e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/UniqueLongArrayIndex.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntLists;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ValueType;
+
+/**
+ * An {@link IndexedTable.Index} backed by an int array.
+ *
+ * This is for long-typed keys whose values all fall in a "reasonable" range.
+ */
+public class UniqueLongArrayIndex implements IndexedTable.Index
+{
+  private final int[] index;
+  private final long minKey;
+
+  /**
+   * Create a new instance backed by a given array.
+   *
+   * @param index  an int array where position {@code i} corresponds to the 
key {@code i + minKey}
+   * @param minKey lowest-valued key
+   *
+   * @see RowBasedIndexBuilder#build() the main caller
+   */
+  UniqueLongArrayIndex(int[] index, long minKey)
+  {
+    this.index = index;
+    this.minKey = minKey;
+  }
+
+  @Override
+  public ValueType keyType()
+  {
+    return ValueType.LONG;
+  }
+
+  @Override
+  public boolean areKeysUnique()
+  {
+    return true;
+  }
+
+  @Override
+  public IntList find(Object key)
+  {
+    final Long longKey = DimensionHandlerUtils.convertObjectToLong(key);
+
+    if (longKey != null) {
+      final int row = findUniqueLong(longKey);
+      if (row >= 0) {
+        return IntLists.singleton(row);
+      }
+    }
+
+    return IntLists.EMPTY_LIST;
+  }
+
+  @Override
+  public int findUniqueLong(long key)
+  {
+    if (key >= minKey && key < (minKey + index.length)) {
+      return index[(int) (key - minKey)];
+    } else {
+      return NOT_FOUND;
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java
index 45367b2..6bbcbb3 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinMatcherTest.java
@@ -19,9 +19,10 @@
 
 package org.apache.druid.segment.join.table;
 
+import com.google.common.collect.ImmutableList;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntLists;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.QueryUnsupportedException;
 import org.apache.druid.segment.ConstantDimensionSelector;
@@ -31,18 +32,18 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ArrayBasedIndexedInts;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.IntFunction;
-import java.util.function.Supplier;
 
 @RunWith(Enclosed.class)
 public class IndexedTableJoinMatcherTest
@@ -54,6 +55,9 @@ public class IndexedTableJoinMatcherTest
   {
     public static class MakeDimensionProcessorTest
     {
+      @Rule
+      public ExpectedException expectedException = ExpectedException.none();
+
       @Mock
       private DimensionSelector dimensionSelector;
 
@@ -63,8 +67,7 @@ public class IndexedTableJoinMatcherTest
         NullHandling.initializeForTests();
       }
 
-      @SuppressWarnings("ReturnValueIgnored")
-      @Test(expected = QueryUnsupportedException.class)
+      @Test
       public void 
testMatchMultiValuedRowCardinalityUnknownShouldThrowException()
       {
         MockitoAnnotations.initMocks(this);
@@ -73,17 +76,18 @@ public class IndexedTableJoinMatcherTest
         
Mockito.doReturn(DimensionDictionarySelector.CARDINALITY_UNKNOWN).when(dimensionSelector).getValueCardinality();
 
         IndexedTableJoinMatcher.ConditionMatcherFactory 
conditionMatcherFactory =
-            new IndexedTableJoinMatcher.ConditionMatcherFactory(
-                ValueType.STRING,
-                IndexedTableJoinMatcherTest::createSingletonIntList
-            );
-        Supplier<IntIterator> dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(dimensionSelector, false);
+            new 
IndexedTableJoinMatcher.ConditionMatcherFactory(stringToLengthIndex());
+        IndexedTableJoinMatcher.ConditionMatcher dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(
+            dimensionSelector,
+            false
+        );
+
         // Test match should throw exception
-        dimensionProcessor.get();
+        expectedException.expect(QueryUnsupportedException.class);
+        dimensionProcessor.match();
       }
 
-      @SuppressWarnings("ReturnValueIgnored")
-      @Test(expected = QueryUnsupportedException.class)
+      @Test
       public void testMatchMultiValuedRowCardinalityKnownShouldThrowException()
       {
         MockitoAnnotations.initMocks(this);
@@ -92,13 +96,15 @@ public class IndexedTableJoinMatcherTest
         Mockito.doReturn(3).when(dimensionSelector).getValueCardinality();
 
         IndexedTableJoinMatcher.ConditionMatcherFactory 
conditionMatcherFactory =
-            new IndexedTableJoinMatcher.ConditionMatcherFactory(
-                ValueType.STRING,
-                IndexedTableJoinMatcherTest::createSingletonIntList
-            );
-        Supplier<IntIterator> dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(dimensionSelector, false);
+            new 
IndexedTableJoinMatcher.ConditionMatcherFactory(stringToLengthIndex());
+        IndexedTableJoinMatcher.ConditionMatcher dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(
+            dimensionSelector,
+            false
+        );
+
         // Test match should throw exception
-        dimensionProcessor.get();
+        expectedException.expect(QueryUnsupportedException.class);
+        dimensionProcessor.match();
       }
 
       @Test
@@ -110,13 +116,16 @@ public class IndexedTableJoinMatcherTest
         
Mockito.doReturn(DimensionDictionarySelector.CARDINALITY_UNKNOWN).when(dimensionSelector).getValueCardinality();
 
         IndexedTableJoinMatcher.ConditionMatcherFactory 
conditionMatcherFactory =
-            new IndexedTableJoinMatcher.ConditionMatcherFactory(
-                ValueType.STRING,
-                IndexedTableJoinMatcherTest::createSingletonIntList
-            );
-        Supplier<IntIterator> dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(dimensionSelector, false);
-        Assert.assertNotNull(dimensionProcessor.get());
-        Assert.assertFalse(dimensionProcessor.get().hasNext());
+            new 
IndexedTableJoinMatcher.ConditionMatcherFactory(stringToLengthIndex());
+        IndexedTableJoinMatcher.ConditionMatcher dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(
+            dimensionSelector,
+            false
+        );
+
+        Assert.assertNotNull(dimensionProcessor.match());
+        Assert.assertFalse(dimensionProcessor.match().hasNext());
+
+        Assert.assertEquals(IndexedTableJoinMatcher.NO_CONDITION_MATCH, 
dimensionProcessor.matchSingleRow());
       }
 
       @Test
@@ -128,46 +137,56 @@ public class IndexedTableJoinMatcherTest
         Mockito.doReturn(0).when(dimensionSelector).getValueCardinality();
 
         IndexedTableJoinMatcher.ConditionMatcherFactory 
conditionMatcherFactory =
-            new IndexedTableJoinMatcher.ConditionMatcherFactory(
-                ValueType.STRING,
-                IndexedTableJoinMatcherTest::createSingletonIntList
-            );
-        Supplier<IntIterator> dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(dimensionSelector, false);
-        Assert.assertNotNull(dimensionProcessor.get());
-        Assert.assertFalse(dimensionProcessor.get().hasNext());
+            new 
IndexedTableJoinMatcher.ConditionMatcherFactory(stringToLengthIndex());
+        IndexedTableJoinMatcher.ConditionMatcher dimensionProcessor = 
conditionMatcherFactory.makeDimensionProcessor(
+            dimensionSelector,
+            false
+        );
+
+        Assert.assertNotNull(dimensionProcessor.match());
+        Assert.assertFalse(dimensionProcessor.match().hasNext());
+
+        Assert.assertEquals(IndexedTableJoinMatcher.NO_CONDITION_MATCH, 
dimensionProcessor.matchSingleRow());
       }
 
       @Test
       public void getsCorrectResultWhenSelectorCardinalityUnknown()
       {
-        Supplier<IntIterator> target = 
makeDimensionProcessor(DimensionDictionarySelector.CARDINALITY_UNKNOWN);
-        Assert.assertEquals(KEY.length(), target.get().nextInt());
+        IndexedTableJoinMatcher.ConditionMatcher target =
+            
makeConditionMatcher(DimensionDictionarySelector.CARDINALITY_UNKNOWN);
+
+        Assert.assertEquals(ImmutableList.of(KEY.length()), new 
IntArrayList(target.match()));
+        Assert.assertEquals(KEY.length(), target.matchSingleRow());
       }
 
       @Test
       public void getsCorrectResultWhenSelectorCardinalityLow()
       {
         int lowCardinality = 
IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10;
-        Supplier<IntIterator> target = makeDimensionProcessor(lowCardinality);
-        Assert.assertEquals(KEY.length(), target.get().nextInt());
+        IndexedTableJoinMatcher.ConditionMatcher target = 
makeConditionMatcher(lowCardinality);
+
+        Assert.assertEquals(ImmutableList.of(KEY.length()), new 
IntArrayList(target.match()));
+        Assert.assertEquals(KEY.length(), target.matchSingleRow());
       }
 
       @Test
       public void getsCorrectResultWhenSelectorCardinalityHigh()
       {
         int highCardinality = 
IndexedTableJoinMatcher.ConditionMatcherFactory.CACHE_MAX_SIZE / 10;
-        Supplier<IntIterator> target = makeDimensionProcessor(highCardinality);
-        Assert.assertEquals(KEY.length(), target.get().nextInt());
+        IndexedTableJoinMatcher.ConditionMatcher target = 
makeConditionMatcher(highCardinality);
+
+        Assert.assertEquals(ImmutableList.of(KEY.length()), new 
IntArrayList(target.match()));
+        Assert.assertEquals(KEY.length(), target.matchSingleRow());
       }
 
-      private static Supplier<IntIterator> makeDimensionProcessor(int 
valueCardinality)
+      private static IndexedTableJoinMatcher.ConditionMatcher 
makeConditionMatcher(int valueCardinality)
       {
         IndexedTableJoinMatcher.ConditionMatcherFactory 
conditionMatcherFactory =
-            new IndexedTableJoinMatcher.ConditionMatcherFactory(
-                ValueType.STRING,
-                IndexedTableJoinMatcherTest::createSingletonIntList
-            );
-        return conditionMatcherFactory.makeDimensionProcessor(new 
TestDimensionSelector(KEY, valueCardinality), false);
+            new 
IndexedTableJoinMatcher.ConditionMatcherFactory(stringToLengthIndex());
+        return conditionMatcherFactory.makeDimensionProcessor(
+            new TestDimensionSelector(KEY, valueCardinality),
+            false
+        );
       }
 
       private static class TestDimensionSelector extends 
ConstantDimensionSelector
@@ -255,7 +274,7 @@ public class IndexedTableJoinMatcherTest
       counter = new AtomicLong(0);
       IntFunction<IntList> loader = key -> {
         counter.incrementAndGet();
-        return createSingletonIntList(key);
+        return IntLists.singleton(key);
       };
 
       target = new IndexedTableJoinMatcher.Int2IntListLookupTable(SIZE, 
loader);
@@ -265,7 +284,7 @@ public class IndexedTableJoinMatcherTest
     public void loadsValueIfAbsent()
     {
       int key = 1;
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
       Assert.assertEquals(1L, counter.longValue());
     }
 
@@ -273,8 +292,8 @@ public class IndexedTableJoinMatcherTest
     public void doesNotLoadIfPresent()
     {
       int key = 1;
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
       Assert.assertEquals(1L, counter.longValue());
     }
   }
@@ -291,7 +310,7 @@ public class IndexedTableJoinMatcherTest
       counter = new AtomicLong(0);
       IntFunction<IntList> loader = key -> {
         counter.incrementAndGet();
-        return createSingletonIntList(key);
+        return IntLists.singleton(key);
       };
 
       target = new IndexedTableJoinMatcher.Int2IntListLruCache(SIZE, loader);
@@ -301,7 +320,7 @@ public class IndexedTableJoinMatcherTest
     public void loadsValueIfAbsent()
     {
       int key = 1;
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
       Assert.assertEquals(1L, counter.longValue());
     }
 
@@ -309,8 +328,8 @@ public class IndexedTableJoinMatcherTest
     public void doesNotLoadIfPresent()
     {
       int key = 1;
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
-      Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
+      Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
       Assert.assertEquals(1L, counter.longValue());
     }
 
@@ -321,23 +340,43 @@ public class IndexedTableJoinMatcherTest
       int next = start + SIZE;
 
       for (int key = start; key < next; key++) {
-        Assert.assertEquals(createSingletonIntList(key), 
target.getAndLoadIfAbsent(key));
+        Assert.assertEquals(IntLists.singleton(key), 
target.getAndLoadIfAbsent(key));
       }
 
-      Assert.assertEquals(createSingletonIntList(next), 
target.getAndLoadIfAbsent(next));
+      Assert.assertEquals(IntLists.singleton(next), 
target.getAndLoadIfAbsent(next));
       Assert.assertNull(target.get(start));
 
       Assert.assertEquals(SIZE + 1, counter.longValue());
     }
   }
 
-  private static IntList createSingletonIntList(Object value)
+  private static IndexedTable.Index stringToLengthIndex()
   {
-    return createSingletonIntList(((String) value).length());
-  }
+    return new IndexedTable.Index()
+    {
+      @Override
+      public ValueType keyType()
+      {
+        return ValueType.STRING;
+      }
 
-  private static IntList createSingletonIntList(int value)
-  {
-    return new IntArrayList(Collections.singleton(value));
+      @Override
+      public boolean areKeysUnique()
+      {
+        return false;
+      }
+
+      @Override
+      public IntList find(Object key)
+      {
+        return IntLists.singleton(((String) key).length());
+      }
+
+      @Override
+      public int findUniqueLong(long key)
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexBuilderTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexBuilderTest.java
new file mode 100644
index 0000000..5925c5f
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexBuilderTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import org.apache.druid.segment.column.ValueType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class RowBasedIndexBuilderTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void test_stringKey_uniqueKeys()
+  {
+    final RowBasedIndexBuilder builder =
+        new RowBasedIndexBuilder(ValueType.STRING)
+            .add("abc")
+            .add("")
+            .add(null)
+            .add("1")
+            .add("def");
+
+    final IndexedTable.Index index = builder.build();
+
+    Assert.assertThat(index, CoreMatchers.instanceOf(MapIndex.class));
+    Assert.assertEquals(ValueType.STRING, index.keyType());
+    Assert.assertTrue(index.areKeysUnique());
+
+    Assert.assertEquals(intList(0), index.find("abc"));
+    Assert.assertEquals(intList(1), index.find(""));
+    Assert.assertEquals(intList(3), index.find(1L));
+    Assert.assertEquals(intList(3), index.find("1"));
+    Assert.assertEquals(intList(4), index.find("def"));
+    Assert.assertEquals(intList(), index.find(null));
+    Assert.assertEquals(intList(), index.find("nonexistent"));
+
+    expectedException.expect(UnsupportedOperationException.class);
+    index.findUniqueLong(0L);
+  }
+
+  @Test
+  public void test_stringKey_duplicateKeys()
+  {
+    final RowBasedIndexBuilder builder =
+        new RowBasedIndexBuilder(ValueType.STRING)
+            .add("abc")
+            .add("")
+            .add(null)
+            .add("abc")
+            .add("1")
+            .add("def");
+
+    final IndexedTable.Index index = builder.build();
+
+    Assert.assertThat(index, CoreMatchers.instanceOf(MapIndex.class));
+    Assert.assertEquals(ValueType.STRING, index.keyType());
+    Assert.assertFalse(index.areKeysUnique());
+
+    Assert.assertEquals(intList(0, 3), index.find("abc"));
+    Assert.assertEquals(intList(1), index.find(""));
+    Assert.assertEquals(intList(4), index.find(1L));
+    Assert.assertEquals(intList(4), index.find("1"));
+    Assert.assertEquals(intList(5), index.find("def"));
+    Assert.assertEquals(intList(), index.find(null));
+    Assert.assertEquals(intList(), index.find("nonexistent"));
+
+    expectedException.expect(UnsupportedOperationException.class);
+    index.findUniqueLong(0L);
+  }
+
+  @Test
+  public void test_longKey_uniqueKeys()
+  {
+    final RowBasedIndexBuilder builder =
+        new RowBasedIndexBuilder(ValueType.LONG)
+            .add(1)
+            .add(5)
+            .add(2);
+
+    final IndexedTable.Index index = builder.build();
+
+    Assert.assertThat(index, 
CoreMatchers.instanceOf(UniqueLongArrayIndex.class));
+    Assert.assertEquals(ValueType.LONG, index.keyType());
+    Assert.assertTrue(index.areKeysUnique());
+
+    Assert.assertEquals(intList(0), index.find(1L));
+    Assert.assertEquals(intList(1), index.find(5L));
+    Assert.assertEquals(intList(2), index.find(2L));
+    Assert.assertEquals(intList(), index.find(3L));
+
+    Assert.assertEquals(0, index.findUniqueLong(1L));
+    Assert.assertEquals(1, index.findUniqueLong(5L));
+    Assert.assertEquals(2, index.findUniqueLong(2L));
+    Assert.assertEquals(IndexedTable.Index.NOT_FOUND, 
index.findUniqueLong(3L));
+  }
+
+  @Test
+  public void test_longKey_uniqueKeys_farApart()
+  {
+    final RowBasedIndexBuilder builder =
+        new RowBasedIndexBuilder(ValueType.LONG)
+            .add(1)
+            .add(10_000_000)
+            .add(2);
+
+    final IndexedTable.Index index = builder.build();
+
+    Assert.assertThat(index, CoreMatchers.instanceOf(MapIndex.class));
+    Assert.assertEquals(ValueType.LONG, index.keyType());
+    Assert.assertTrue(index.areKeysUnique());
+
+    Assert.assertEquals(intList(0), index.find(1L));
+    Assert.assertEquals(intList(1), index.find(10_000_000L));
+    Assert.assertEquals(intList(2), index.find(2L));
+    Assert.assertEquals(intList(), index.find(3L));
+
+    Assert.assertEquals(0, index.findUniqueLong(1L));
+    Assert.assertEquals(1, index.findUniqueLong(10_000_000L));
+    Assert.assertEquals(2, index.findUniqueLong(2L));
+    Assert.assertEquals(IndexedTable.Index.NOT_FOUND, 
index.findUniqueLong(3L));
+  }
+
+  @Test
+  public void test_longKey_duplicateKeys()
+  {
+    final RowBasedIndexBuilder builder =
+        new RowBasedIndexBuilder(ValueType.LONG)
+            .add(1)
+            .add(5)
+            .add(1)
+            .add(2);
+
+    final IndexedTable.Index index = builder.build();
+
+    Assert.assertThat(index, CoreMatchers.instanceOf(MapIndex.class));
+    Assert.assertEquals(ValueType.LONG, index.keyType());
+    Assert.assertFalse(index.areKeysUnique());
+
+    Assert.assertEquals(intList(0, 2), index.find("1"));
+    Assert.assertEquals(intList(0, 2), index.find(1));
+    Assert.assertEquals(intList(0, 2), index.find(1L));
+    Assert.assertEquals(intList(1), index.find(5L));
+    Assert.assertEquals(intList(3), index.find(2L));
+    Assert.assertEquals(intList(), index.find(3L));
+
+    expectedException.expect(UnsupportedOperationException.class);
+    index.findUniqueLong(5L);
+  }
+
+  public IntList intList(final int... ints)
+  {
+    final IntArrayList retVal = new IntArrayList(ints.length);
+    for (int i : ints) {
+      retVal.add(i);
+    }
+    return retVal;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to