jon-wei commented on a change in pull request #9111: Add HashJoinSegment, a 
virtual segment for joins.
URL: https://github.com/apache/druid/pull/9111#discussion_r366125850
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java
 ##########
 @@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.lookup;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.guava.SettableSupplier;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.join.Equality;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.JoinMatcher;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class LookupJoinMatcher implements JoinMatcher
+{
+  private static final ColumnProcessorFactory<Supplier<String>> 
LEFT_KEY_READER =
+      new ColumnProcessorFactory<Supplier<String>>()
+      {
+        @Override
+        public ValueType defaultType()
+        {
+          return ValueType.STRING;
+        }
+
+        @Override
+        public Supplier<String> makeDimensionProcessor(DimensionSelector 
selector)
+        {
+          return () -> {
+            final IndexedInts row = selector.getRow();
+
+            if (row.size() == 1) {
+              return selector.lookupName(row.get(0));
+            } else {
+              // Multi-valued rows are not handled by the join system right 
now; treat them as nulls.
+              return null;
+            }
+          };
+        }
+
+        @Override
+        public Supplier<String> 
makeFloatProcessor(BaseFloatColumnValueSelector selector)
+        {
+          if (NullHandling.replaceWithDefault()) {
+            return () -> 
DimensionHandlerUtils.convertObjectToString(selector.getFloat());
+          } else {
+            return () -> selector.isNull() ? null : 
DimensionHandlerUtils.convertObjectToString(selector.getFloat());
+          }
+        }
+
+        @Override
+        public Supplier<String> 
makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
+        {
+          if (NullHandling.replaceWithDefault()) {
+            return () -> 
DimensionHandlerUtils.convertObjectToString(selector.getDouble());
+          } else {
+            return () -> selector.isNull() ? null : 
DimensionHandlerUtils.convertObjectToString(selector.getDouble());
+          }
+        }
+
+        @Override
+        public Supplier<String> makeLongProcessor(BaseLongColumnValueSelector 
selector)
+        {
+          if (NullHandling.replaceWithDefault()) {
+            return () -> 
DimensionHandlerUtils.convertObjectToString(selector.getLong());
+          } else {
+            return () -> selector.isNull() ? null : 
DimensionHandlerUtils.convertObjectToString(selector.getLong());
+          }
+        }
+
+        @Override
+        public Supplier<String> 
makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
+        {
+          return () -> null;
+        }
+      };
+
+  // currentIterator and currentEntry track iteration position through the 
currently-matched-rows.
+  // 1) currentEntry is the entry that our column selector factory is looking 
at right now.
+  // 2) currentIterator contains future matches that it _will_ be looking at 
after nextMatch() is called.
+  @Nullable
+  private Iterator<Map.Entry<String, String>> currentIterator = null;
+  private final SettableSupplier<Pair<String, String>> currentEntry = new 
SettableSupplier<>();
+
+  private final LookupExtractor extractor;
+  private final JoinConditionAnalysis condition;
+  private final List<Supplier<String>> keySuppliers;
+  private final ColumnSelectorFactory selectorFactory = new 
LookupColumnSelectorFactory(currentEntry::get);
+
+  // matchedKeys and matchingRemainder are used to implement matchRemainder().
+  private boolean matchingRemainder = false;
+  private final Set<String> matchedKeys;
+
+  private LookupJoinMatcher(
+      LookupExtractor extractor,
+      ColumnSelectorFactory leftSelectorFactory,
+      JoinConditionAnalysis condition,
+      @Nullable List<Expr> keyExprs,
+      boolean remainderNeeded
+  )
+  {
+    this.extractor = extractor;
+    this.matchedKeys = remainderNeeded && !condition.isAlwaysTrue() && 
!condition.isAlwaysFalse()
+                       ? new HashSet<>()
+                       : null;
+    this.condition = condition;
+
+    if (keyExprs != null) {
+      this.keySuppliers = keyExprs.stream()
+                                  .map(
+                                      expr ->
+                                          ColumnProcessors.makeProcessor(
+                                              expr,
+                                              ValueType.STRING,
+                                              LEFT_KEY_READER,
+                                              leftSelectorFactory
+                                          )
+                                  )
+                                  .collect(Collectors.toList());
+    } else {
+      // This check is to guard against bugs; users should never see it.
+      Preconditions.checkState(
+          condition.isAlwaysFalse() || condition.isAlwaysTrue(),
+          "Condition must be always true or always false when keySuppliers == 
null"
+      );
+
+      this.keySuppliers = null;
+    }
+
+    // Verify that extractor can be iterated when needed.
+    if (condition.isAlwaysTrue() || remainderNeeded) {
+      Preconditions.checkState(
+          extractor.canIterate(),
+          "Cannot iterate lookup, but iteration is required for this join"
+      );
+    }
+  }
+
+  public static LookupJoinMatcher create(
+      LookupExtractor extractor,
+      ColumnSelectorFactory leftSelectorFactory,
+      JoinConditionAnalysis condition,
+      boolean remainderNeeded
+  )
+  {
+    final List<Expr> keyExprs;
+
+    if (condition.isAlwaysTrue()) {
+      keyExprs = null;
+    } else if (condition.isAlwaysFalse()) {
+      keyExprs = null;
+    } else if (condition.getNonEquiConditions().isEmpty()
+               && condition.getEquiConditions()
+                           .stream()
+                           .allMatch(eq -> 
eq.getRightColumn().equals(LookupColumnSelectorFactory.KEY_COLUMN))) {
+      keyExprs = condition.getEquiConditions().stream()
+                          .map(Equality::getLeftExpr)
+                          .collect(Collectors.toList());
+    } else {
+      throw new IAE("Cannot join lookup with condition: %s", condition);
 
 Review comment:
   Suggest splitting up the non-equiconditions check and the lookup key column 
check, and using a specific exception message for each case

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to