github-advanced-security[bot] commented on code in PR #19579:
URL: https://github.com/apache/druid/pull/19579#discussion_r3420357309


##########
processing/src/main/java/org/apache/druid/segment/incremental/OnHeapClusteredBaseTable.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Ordering;
+import org.apache.druid.data.input.InputRow;
+import 
org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionDictionary;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.SortedDimensionDictionary;
+import org.apache.druid.segment.StringDimensionDictionary;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import 
org.apache.druid.segment.projections.ClusteredValueGroupsBaseTableSchema;
+import org.apache.druid.segment.projections.ClusteringDictionaries;
+import org.apache.druid.segment.projections.TableClusterGroupSpec;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Segment-wide root of the clustered base-table machinery for {@link 
OnheapIncrementalIndex}: holds the per-type
+ * shared clustering dictionaries, the clustering virtual-column selector 
factory, and the map of
+ * {@link OnHeapClusterGroup} instances keyed by the clustering-value 
dictionary-ID tuple. The parent
+ * {@code OnheapIncrementalIndex} delegates row ingestion here when its schema 
carries a
+ * {@link ClusteredValueGroupsBaseTableProjectionSpec}, and the base facts 
holder stays empty in that mode.
+ * <p>
+ * On row arrival: each clustering column's value is resolved through the 
virtual-column selector, then
+ * dictionary-encoded into the per-type dictionary for the column's clustering 
type. The resulting dictionary-ID tuple
+ * becomes the lookup key for the {@link OnHeapClusterGroup} that owns the 
row, new groups are materialized lazily as
+ * new tuples arrive.
+ * <p>
+ * Dictionaries here are insertion-order (id = first-seen position). The 
persist path sorts + remaps them into the
+ * read-side {@link ClusteringDictionaries} shape (sorted, nulls first) at 
segment-write time.
+ */
+public final class OnHeapClusteredBaseTable
+{
+  private static final Comparator<TableClusterGroupSpec> 
BY_CLUSTERING_VALUE_IDS =
+      Comparator.comparing(TableClusterGroupSpec::getClusteringValueIds, 
Ordering.<Integer>natural().lexicographical());
+
+  private final ClusteredValueGroupsBaseTableProjectionSpec spec;
+  private final RowSignature clusteringColumns;
+  private final VirtualColumns virtualColumns;
+  private final ColumnSelectorFactory virtualSelectorFactory;
+  private final IncrementalIndex.InputRowHolder inputRowHolder;
+
+  // template state for instantiating new OnHeapClusterGroups
+  private final List<DimensionSchema> nonClusteringDimensions;
+  private final boolean rollup;
+  private final int timePosition;
+
+  private final DimensionDictionary<String> stringDictionary = new 
StringDimensionDictionary();
+  private final DimensionDictionary<Long> longDictionary = new 
DimensionDictionary<>(Long.class)
+  {
+    @Override
+    public long estimateSizeOfValue(Long value)
+    {
+      return Long.BYTES;
+    }
+  };
+  private final DimensionDictionary<Double> doubleDictionary = new 
DimensionDictionary<>(Double.class)
+  {
+    @Override
+    public long estimateSizeOfValue(Double value)
+    {
+      return Double.BYTES;
+    }
+  };
+  private final DimensionDictionary<Float> floatDictionary = new 
DimensionDictionary<>(Float.class)
+  {
+    @Override
+    public long estimateSizeOfValue(Float value)
+    {
+      return Float.BYTES;
+    }
+  };
+
+  // Keyed by the clustering-value dictionary-ID tuple
+  private final ConcurrentHashMap<List<Integer>, OnHeapClusterGroup> groups = 
new ConcurrentHashMap<>();
+  private final AtomicInteger totalNumRows = new AtomicInteger(0);
+  // min/max bucketed row timestamp across all groups; the base facts holder 
is empty in clustered mode so the
+  // parent index's interval accessors delegate here instead.
+  private final AtomicLong minTimeMillis = new AtomicLong(Long.MAX_VALUE);
+  private final AtomicLong maxTimeMillis = new AtomicLong(Long.MIN_VALUE);
+
+  public OnHeapClusteredBaseTable(
+      ClusteredValueGroupsBaseTableProjectionSpec spec,
+      VirtualColumns segmentVirtualColumns,
+      IncrementalIndex.InputRowHolder inputRowHolder,
+      boolean rollup,
+      int timePosition
+  )
+  {
+    this.spec = spec;
+    this.inputRowHolder = inputRowHolder;
+    this.rollup = rollup;
+    this.timePosition = timePosition;
+    this.nonClusteringDimensions = Collections.unmodifiableList(
+        new ArrayList<>(spec.getNonClusteringColumns())
+    );
+
+    final RowSignature.Builder sigBuilder = RowSignature.builder();
+    for (DimensionSchema c : spec.getClusteringColumns()) {
+      sigBuilder.add(c.getName(), c.getColumnType());
+    }
+    this.clusteringColumns = sigBuilder.build();
+
+    this.virtualColumns = mergeVirtualColumns(segmentVirtualColumns, 
spec.getVirtualColumns());
+    this.virtualSelectorFactory = new 
OnheapIncrementalIndex.CachingColumnSelectorFactory(
+        IncrementalIndex.makeColumnSelectorFactory(this.virtualColumns, 
inputRowHolder, null)
+    );
+  }
+
+  /**
+   * Resolve the clustering tuple for {@code row}, locate (or create) the 
matching {@link OnHeapClusterGroup}, and
+   * dispatch the row to it. {@code key} carries the bucketed timestamp from 
the parent's {@code toIncrementalIndexRow}
+   * pre-processing, its dim slots are ignored here since in clustered mode 
the non-clustering data lives only on the
+   * per-group facts holders. Returns true when the row created a new fact 
entry in its group (vs. rolling up into an
+   * existing one).
+   */
+  boolean addToFacts(
+      IncrementalIndexRow key,
+      InputRow row,
+      List<String> parseExceptionMessages,
+      AtomicLong totalSizeInBytes
+  )
+  {
+    final long clusteringDictSizeBefore = clusteringDictionariesSizeInBytes();
+    final Object[] clusteringValues = new Object[clusteringColumns.size()];
+    final List<Integer> clusteringValueIds = new 
ArrayList<>(clusteringColumns.size());
+    for (int i = 0; i < clusteringColumns.size(); i++) {
+      final String name = clusteringColumns.getColumnName(i);
+      final ColumnType type = clusteringColumns.getColumnType(i)
+                                               .orElseThrow(() -> 
DruidException.defensive(
+                                                   "clustering column [%s] has 
no type",
+                                                   name
+                                               ));
+      final ColumnValueSelector<?> selector = 
virtualSelectorFactory.makeColumnValueSelector(name);
+      final Object raw = selector.getObject();
+      Object coerced;
+      try {
+        coerced = DimensionHandlerUtils.convertObjectToType(raw, type, true, 
name);
+      }
+      catch (ParseException pe) {
+        parseExceptionMessages.add(pe.getMessage());
+        coerced = null;
+      }
+      clusteringValues[i] = coerced;
+      clusteringValueIds.add(internClusteringValue(type, coerced));
+    }
+    totalSizeInBytes.addAndGet(clusteringDictionariesSizeInBytes() - 
clusteringDictSizeBefore);
+
+    final boolean[] groupCreated = {false};
+    final OnHeapClusterGroup group = groups.computeIfAbsent(
+        clusteringValueIds,
+        ids -> {
+          groupCreated[0] = true;
+          return new OnHeapClusterGroup(
+              this,
+              clusteringValues,
+              ids,
+              nonClusteringDimensions,
+              virtualColumns,
+              inputRowHolder,
+              rollup,
+              timePosition
+          );
+        }
+    );
+    if (groupCreated[0]) {
+      totalSizeInBytes.addAndGet(estimateNewGroupOverhead());
+    }
+    final boolean isNewEntry = group.addToFacts(row, key.getTimestamp(), 
parseExceptionMessages, totalSizeInBytes);
+    totalNumRows.incrementAndGet();
+    minTimeMillis.accumulateAndGet(key.getTimestamp(), Math::min);
+    maxTimeMillis.accumulateAndGet(key.getTimestamp(), Math::max);
+    return isNewEntry;
+  }
+
+  /**
+   * Rough running-heap cost of a newly-created cluster group beyond its rows: 
the {@link #groups}-map node, the boxed
+   * clustering-id tuple held as the map key, the per-group clustering-values 
array, and the group object's fixed
+   * structures.
+   */
+  private long estimateNewGroupOverhead()
+  {
+    final int numClusteringColumns = clusteringColumns.size();
+    return OnheapIncrementalIndex.ROUGH_OVERHEAD_PER_MAP_ENTRY
+           + (long) numClusteringColumns * (Long.BYTES * 2 + Long.BYTES)

Review Comment:
   ## CodeQL / Result of multiplication cast to wider type
   
   Potential overflow in [int multiplication](1) before it is converted to long 
by use in a numeric context.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/11309)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to