jasonk000 commented on code in PR #18813:
URL: https://github.com/apache/druid/pull/18813#discussion_r2600426132
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramBufferAggregator.java:
##########
@@ -62,70 +57,41 @@ public void aggregate(ByteBuffer buffer, int position)
if (obj == null) {
return;
}
- SpectatorHistogram counts = histogramCache.get(buffer).get(position);
- if (obj instanceof SpectatorHistogram) {
- SpectatorHistogram other = (SpectatorHistogram) obj;
- counts.merge(other);
- } else if (obj instanceof Number) {
- counts.insert((Number) obj);
- } else {
- throw new IAE(
- "Expected a number or a long[], but received [%s] of type [%s]",
- obj,
- obj.getClass()
- );
- }
+ SpectatorHistogram counts = innerAggregator.get(buffer, position);
+ innerAggregator.merge(counts, obj);
}
@Override
public Object get(final ByteBuffer buffer, final int position)
{
- // histogramCache is an IdentityHashMap where the reference of buffer is
used for equality checks.
- // So the returned object isn't impacted by the changes in the buffer
object made by concurrent threads.
-
- SpectatorHistogram spectatorHistogram =
histogramCache.get(buffer).get(position);
- if (spectatorHistogram.isEmpty()) {
+ SpectatorHistogram histo = innerAggregator.get(buffer, position);
+ if (histo.isEmpty()) {
Review Comment:
I see this is in the previous code. Regardless, this state seems odd. Should
we actively remove it from the buffer if we're never going to expose it?
##########
benchmarks/src/test/java/org/apache/druid/benchmark/SpectatorHistogramAggregatorBenchmark.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import org.apache.druid.spectator.histogram.SpectatorHistogramModule;
+import org.apache.druid.timeline.SegmentId;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark to compare performance of SpectatorHistogram aggregator with and
without vectorization.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {
+ "--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED",
+ "--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED",
+ "--add-opens=java.base/java.io=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang=ALL-UNNAMED",
+ "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED"
+})
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class SpectatorHistogramAggregatorBenchmark
+{
+ private static final Logger log = new
Logger(SpectatorHistogramAggregatorBenchmark.class);
+ private static final int RNG_SEED = 9999;
+ private static final IndexMergerV9 INDEX_MERGER_V9;
+ private static final IndexIO INDEX_IO;
+ public static final ObjectMapper JSON_MAPPER;
+
+ static {
+ JSON_MAPPER = new DefaultObjectMapper();
+ // Register the SpectatorHistogram Jackson modules and serde
+ SpectatorHistogramModule module = new SpectatorHistogramModule();
+ for (Module jacksonModule : module.getJacksonModules()) {
+ JSON_MAPPER.registerModule(jacksonModule);
+ }
+ SpectatorHistogramModule.registerSerde();
+
+ INDEX_IO = new IndexIO(
+ JSON_MAPPER,
+ new ColumnConfig()
+ {
+ }
+ );
+ INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO,
OffHeapMemorySegmentWriteOutMediumFactory.instance());
+ }
+
+ @Param({"1000000"})
+ private int rowsPerSegment;
+
+ @Param({"false", "true"})
+ private String vectorize;
+
+ @Param({"long1"})
+ private String metricName;
+
+ private AppendableIndexSpec appendableIndexSpec;
+ private AggregatorFactory spectatorHistogramFactory;
+ private DataGenerator generator;
+ private QueryRunnerFactory factory;
+ private GeneratorSchemaInfo schemaInfo;
+ private TimeseriesQuery query;
+
+ /**
+ * Setup everything common for benchmarking both the incremental-index and
the queryable-index.
+ */
+ @Setup
+ public void setup()
+ {
+ log.info("SETUP CALLED AT " + System.currentTimeMillis());
+
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+
+ spectatorHistogramFactory = new
SpectatorHistogramAggregatorFactory("spectatorHistogram", metricName);
+
+ QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
+ Collections.singletonList(schemaInfo.getDataInterval())
+ );
+
+ generator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ RNG_SEED,
+ schemaInfo.getDataInterval(),
+ rowsPerSegment
+ );
+
+ query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("blah")
+ .granularity(Granularities.ALL)
+ .intervals(intervalSpec)
+
.aggregators(Collections.singletonList(spectatorHistogramFactory))
+ .descending(false)
+ .build();
+
+ factory = new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ QueryBenchmarkUtil.NOOP_QUERYWATCHER
+ );
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
+ {
+ IncrementalIndex incIndex;
+
+ @Setup(Level.Invocation)
+ public void setup(SpectatorHistogramAggregatorBenchmark global)
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+ incIndex = global.makeIncIndex(global.spectatorHistogramFactory);
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queryable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
+ {
+ private File qIndexesDir;
+ private QueryableIndex qIndex;
+
+ @Setup
+ public void setup(SpectatorHistogramAggregatorBenchmark global) throws
IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ IncrementalIndex incIndex =
global.makeIncIndex(global.spectatorHistogramFactory);
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+
+ qIndexesDir = FileUtils.createTempDir();
+ log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ qIndexesDir,
+ IndexSpec.getDefault(),
+ null
+ );
+ incIndex.close();
+
+ qIndex = INDEX_IO.loadIndex(indexFile);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (qIndex != null) {
+ qIndex.close();
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
+ }
+
+ private IncrementalIndex makeIncIndex(AggregatorFactory metric)
+ {
+ return appendableIndexSpec.builder()
+ .setSimpleTestingIndexSchema(metric)
+ .setMaxRowCount(rowsPerSegment)
+ .build();
+ }
+
+ private static <T> List<T> runQuery(
+ QueryRunnerFactory factory,
+ QueryRunner runner,
+ Query<T> query,
+ String vectorize
+ )
+ {
+ QueryToolChest toolChest = factory.getToolchest();
+ QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
+ toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
+ toolChest
+ );
+
+ final QueryPlus<T> queryToRun = QueryPlus.wrap(
+ query.withOverriddenContext(
+ ImmutableMap.of(
+ QueryContexts.VECTORIZE_KEY, vectorize,
+ QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
+ )
+ )
+ );
+ Sequence<T> queryResult = theRunner.run(queryToRun,
ResponseContext.createEmpty());
+ return queryResult.toList();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void queryIncrementalIndex(Blackhole blackhole, IncrementalIndexState
state)
+ {
+ QueryRunner<Result<TimeseriesResultValue>> runner =
QueryBenchmarkUtil.makeQueryRunner(
+ factory,
+ SegmentId.dummy("incIndex"),
+ new IncrementalIndexSegment(state.incIndex,
SegmentId.dummy("incIndex"))
+ );
+
+ List<Result<TimeseriesResultValue>> results = runQuery(factory, runner,
query, vectorize);
+ blackhole.consume(results);
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void queryQueryableIndex(Blackhole blackhole, QueryableIndexState
state)
+ {
+ QueryRunner<Result<TimeseriesResultValue>> runner =
QueryBenchmarkUtil.makeQueryRunner(
+ factory,
+ SegmentId.dummy("qIndex"),
+ new QueryableIndexSegment(state.qIndex, SegmentId.dummy("qIndex"))
+ );
+
+ List<Result<TimeseriesResultValue>> results = runQuery(factory, runner,
query, vectorize);
+ blackhole.consume(results);
+ }
+
+ public static void main(String[] args) throws RunnerException
+ {
+ Options opt = new OptionsBuilder()
+ .include(SpectatorHistogramAggregatorBenchmark.class.getSimpleName())
+ .forks(1)
+ .warmupIterations(1)
+ .measurementIterations(2)
Review Comment:
A main is likely not required since we can use a compiled jmh-jar. If it is,
then these numbers should be higher by default, I doubt there is enough time
here for them to consistently settle down (warmup) and mitigate any variance
(fork + measurement).
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+
+/**
+ * Helper class for Spectator histogram implementations of {@link
org.apache.druid.query.aggregation.BufferAggregator} and {@link
org.apache.druid.query.aggregation.VectorAggregator}.
+ */
+public class SpectatorHistogramAggregateHelper
+{
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>>
histogramCache = new IdentityHashMap<>();
+
+ public void init(ByteBuffer buffer, int position)
+ {
+ SpectatorHistogram emptyCounts = new SpectatorHistogram();
+ addToCache(buffer, position, emptyCounts);
+ }
+
+ /**
+ * Merge obj ({@link SpectatorHistogram} or {@link Object}) into {@param
current}.
+ */
+ public void merge(SpectatorHistogram current, Object obj)
+ {
+ if (obj instanceof SpectatorHistogram) {
+ SpectatorHistogram other = (SpectatorHistogram) obj;
+ current.merge(other);
+ } else if (obj instanceof Number) {
+ current.insert((Number) obj);
+ } else {
+ throw new IAE(
+ "Expected a Number, but received [%s] of type [%s]",
+ obj,
+ obj.getClass()
+ );
+ }
+ }
+
+ /**
+ * Merge {@param value} into {@param current}.
+ */
+ public void merge(SpectatorHistogram current, long value)
+ {
+ current.insert(value);
+ }
+
+ /**
+ * Fetches the SpectatorHistogram at the given buffer/position pair in the
cache
+ */
+ public SpectatorHistogram get(final ByteBuffer buffer, final int position)
+ {
+ // histogramCache is an IdentityHashMap where the reference of buffer is
used for equality checks.
+ // So the returned object isn't impacted by the changes in the buffer
object made by concurrent threads.
+ final Int2ObjectMap<SpectatorHistogram> map = histogramCache.get(buffer);
+ if (map == null) {
+ return null;
+ }
+ return map.get(position);
+ }
+
+ /**
+ * Fetches the SpectatorHistogram cache for the given buffer
+ */
+ public Int2ObjectMap<SpectatorHistogram> get(final ByteBuffer buffer)
+ {
+ return histogramCache.get(buffer);
+ }
+
+ public float getFloat(final ByteBuffer buffer, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public long getLong(final ByteBuffer buffer, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Resets the helper by clearing the buffer/histogram cache.
+ */
+ public void close()
+ {
+ histogramCache.clear();
+ }
+
+ /**
+ * Move histogram located at {@param oldBuffer} in position {@param
oldPosition} to {@param newBuffer} in position {@param newPosition}.
+ */
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer,
ByteBuffer newBuffer)
+ {
+ final SpectatorHistogram histogram =
histogramCache.get(oldBuffer).get(oldPosition);
+ addToCache(newBuffer, newPosition, histogram);
+
+ final Int2ObjectMap<SpectatorHistogram> map =
histogramCache.get(oldBuffer);
+ map.remove(oldPosition);
+ if (map.isEmpty()) {
+ histogramCache.remove(oldBuffer);
+ }
+ }
+
+ private void addToCache(final ByteBuffer buffer, final int position, final
SpectatorHistogram histogram)
+ {
+ Int2ObjectMap<SpectatorHistogram> map = histogramCache.computeIfAbsent(
Review Comment:
does this need to be concurrent-capable?
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+
+/**
+ * Helper class for Spectator histogram implementations of {@link
org.apache.druid.query.aggregation.BufferAggregator} and {@link
org.apache.druid.query.aggregation.VectorAggregator}.
+ */
+public class SpectatorHistogramAggregateHelper
+{
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>>
histogramCache = new IdentityHashMap<>();
+
+ public void init(ByteBuffer buffer, int position)
+ {
+ SpectatorHistogram emptyCounts = new SpectatorHistogram();
+ addToCache(buffer, position, emptyCounts);
+ }
+
+ /**
+ * Merge obj ({@link SpectatorHistogram} or {@link Object}) into {@param
current}.
+ */
+ public void merge(SpectatorHistogram current, Object obj)
+ {
+ if (obj instanceof SpectatorHistogram) {
+ SpectatorHistogram other = (SpectatorHistogram) obj;
+ current.merge(other);
+ } else if (obj instanceof Number) {
+ current.insert((Number) obj);
+ } else {
+ throw new IAE(
+ "Expected a Number, but received [%s] of type [%s]",
+ obj,
+ obj.getClass()
+ );
+ }
+ }
+
+ /**
+ * Merge {@param value} into {@param current}.
+ */
+ public void merge(SpectatorHistogram current, long value)
+ {
+ current.insert(value);
+ }
+
+ /**
+ * Fetches the SpectatorHistogram at the given buffer/position pair in the
cache
+ */
+ public SpectatorHistogram get(final ByteBuffer buffer, final int position)
+ {
+ // histogramCache is an IdentityHashMap where the reference of buffer is
used for equality checks.
+ // So the returned object isn't impacted by the changes in the buffer
object made by concurrent threads.
Review Comment:
I don't understand the significance of this comment. Do we expect concurrent
read/write of the underlying Int2ObjectMap? (if so, does it need
synchronization?).
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/*
+ Aggregator used during ingestion time when aggregating against numeric
values.
+*/
+public class SpectatorHistogramNumericVectorizedAggregator implements
VectorAggregator
+{
+ private final SpectatorHistogramAggregateHelper innerAggregator = new
SpectatorHistogramAggregateHelper();
+ private final VectorValueSelector selector;
+
+ public SpectatorHistogramNumericVectorizedAggregator(VectorValueSelector
selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(ByteBuffer buffer, int position)
+ {
+ // Map buf to a new histogram
+ innerAggregator.init(buffer, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buffer, int position, int startRow, int
endRow)
+ {
+ boolean[] isNull = selector.getNullVector();
+ long[] vector = selector.getLongVector();
+ boolean hasNulls = isNull != null;
+ final SpectatorHistogram histogram = innerAggregator.get(buffer, position);
+
+ for (int i = startRow; i < endRow; ++i) {
+ if (!hasNulls || !isNull[i]) {
+ innerAggregator.merge(histogram, vector[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buffer, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
Review Comment:
style: would `int[] rows` as `int[] rowMap` or `int[] rowIndexMap` be better?
##########
extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelperTest.java:
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.netflix.spectator.api.histogram.PercentileBuckets;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class SpectatorHistogramAggregateHelperTest extends
InitializedNullHandlingTest
+{
+ private static final int POSITION = 0;
+ private static final int POSITION_2 = 1;
+
+ private SpectatorHistogramAggregateHelper helper;
+ private ByteBuffer buffer;
+
+ @Before
+ public void setUp()
+ {
+ helper = new SpectatorHistogramAggregateHelper();
+ buffer = ByteBuffer.allocate(1024);
+ }
+
+ @After
+ public void tearDown()
+ {
+ helper.close();
+ }
+
+ @Test
+ public void testInitCreatesEmptyHistogram()
+ {
+ helper.init(buffer, POSITION);
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testMergeWithSpectatorHistogramObject()
+ {
+ SpectatorHistogram other = new SpectatorHistogram();
+ other.add(PercentileBuckets.indexOf(100), 5L);
+ other.add(PercentileBuckets.indexOf(200), 3L);
+
+ helper.init(buffer, POSITION);
+ SpectatorHistogram histogram = helper.get(buffer, POSITION);
+ helper.merge(histogram, other);
+
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(other, result);
+ }
+
+ @Test
+ public void testMergeWithLongValues()
+ {
+ helper.init(buffer, POSITION);
+ SpectatorHistogram histogram = helper.get(buffer, POSITION);
+ helper.merge(histogram, 100L);
+ helper.merge(histogram, 200L);
+ helper.merge(histogram, 100L);
+
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.insert(100L);
+ expected.insert(200L);
+ expected.insert(100L);
+
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMergeWithSpectatorHistogram()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(100), 5L);
+
+ SpectatorHistogram other = new SpectatorHistogram();
+ other.add(PercentileBuckets.indexOf(100), 3L);
+ other.add(PercentileBuckets.indexOf(200), 2L);
+
+ helper.merge(histogram, other);
+
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(100), 8L);
+ expected.add(PercentileBuckets.indexOf(200), 2L);
+
+ Assert.assertEquals(expected, histogram);
+ }
+
+ @Test
+ public void testMergeWithNumber()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+
+ helper.merge(histogram, 100);
+ helper.merge(histogram, 200L);
+ helper.merge(histogram, 300.0);
+
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.insert(100);
+ expected.insert(200L);
+ expected.insert(300.0);
+
+ Assert.assertEquals(expected, histogram);
Review Comment:
I'm not sure this assert is testing anything. Probably the `expected` should
be written with all-primitives.
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+
+/**
+ * Helper class for Spectator histogram implementations of {@link
org.apache.druid.query.aggregation.BufferAggregator} and {@link
org.apache.druid.query.aggregation.VectorAggregator}.
+ */
+public class SpectatorHistogramAggregateHelper
+{
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>>
histogramCache = new IdentityHashMap<>();
+
+ public void init(ByteBuffer buffer, int position)
+ {
+ SpectatorHistogram emptyCounts = new SpectatorHistogram();
+ addToCache(buffer, position, emptyCounts);
+ }
+
+ /**
+ * Merge obj ({@link SpectatorHistogram} or {@link Object}) into {@param
current}.
+ */
+ public void merge(SpectatorHistogram current, Object obj)
+ {
+ if (obj instanceof SpectatorHistogram) {
+ SpectatorHistogram other = (SpectatorHistogram) obj;
+ current.merge(other);
+ } else if (obj instanceof Number) {
+ current.insert((Number) obj);
+ } else {
+ throw new IAE(
+ "Expected a Number, but received [%s] of type [%s]",
+ obj,
+ obj.getClass()
+ );
+ }
+ }
+
+ /**
+ * Merge {@param value} into {@param current}.
+ */
+ public void merge(SpectatorHistogram current, long value)
+ {
+ current.insert(value);
Review Comment:
(.. refer above, I think this will end up allocating on
`SpectatorHistogram::insert` as there is no primitive `long` implementation).
##########
benchmarks/src/test/java/org/apache/druid/benchmark/SpectatorHistogramAggregatorBenchmark.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import org.apache.druid.spectator.histogram.SpectatorHistogramModule;
+import org.apache.druid.timeline.SegmentId;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark to compare performance of SpectatorHistogram aggregator with and
without vectorization.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {
+ "--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED",
+ "--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED",
+ "--add-opens=java.base/java.io=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang=ALL-UNNAMED",
+ "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED"
+})
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class SpectatorHistogramAggregatorBenchmark
+{
+ private static final Logger log = new
Logger(SpectatorHistogramAggregatorBenchmark.class);
+ private static final int RNG_SEED = 9999;
+ private static final IndexMergerV9 INDEX_MERGER_V9;
+ private static final IndexIO INDEX_IO;
+ public static final ObjectMapper JSON_MAPPER;
+
+ static {
+ JSON_MAPPER = new DefaultObjectMapper();
+ // Register the SpectatorHistogram Jackson modules and serde
+ SpectatorHistogramModule module = new SpectatorHistogramModule();
+ for (Module jacksonModule : module.getJacksonModules()) {
+ JSON_MAPPER.registerModule(jacksonModule);
+ }
+ SpectatorHistogramModule.registerSerde();
+
+ INDEX_IO = new IndexIO(
+ JSON_MAPPER,
+ new ColumnConfig()
+ {
+ }
+ );
+ INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO,
OffHeapMemorySegmentWriteOutMediumFactory.instance());
+ }
+
+ @Param({"1000000"})
+ private int rowsPerSegment;
+
+ @Param({"false", "true"})
+ private String vectorize;
+
+ @Param({"long1"})
+ private String metricName;
+
+ private AppendableIndexSpec appendableIndexSpec;
+ private AggregatorFactory spectatorHistogramFactory;
+ private DataGenerator generator;
+ private QueryRunnerFactory factory;
+ private GeneratorSchemaInfo schemaInfo;
+ private TimeseriesQuery query;
+
+ /**
+ * Setup everything common for benchmarking both the incremental-index and
the queryable-index.
+ */
+ @Setup
+ public void setup()
+ {
+ log.info("SETUP CALLED AT " + System.currentTimeMillis());
Review Comment:
What's the purpose of this during the JMH run?
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogram.java:
##########
@@ -276,7 +276,7 @@ byte[] toBytes()
return Arrays.copyOf(buffer.array(), buffer.position());
}
- void insert(Number num)
+ public void insert(Number num)
Review Comment:
We should add another `public void insert(long value)` to avoid the implicit
Long/Number allocation.
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramBufferAggregator.java:
##########
@@ -62,70 +57,41 @@ public void aggregate(ByteBuffer buffer, int position)
if (obj == null) {
return;
}
- SpectatorHistogram counts = histogramCache.get(buffer).get(position);
- if (obj instanceof SpectatorHistogram) {
- SpectatorHistogram other = (SpectatorHistogram) obj;
- counts.merge(other);
- } else if (obj instanceof Number) {
- counts.insert((Number) obj);
- } else {
- throw new IAE(
- "Expected a number or a long[], but received [%s] of type [%s]",
- obj,
- obj.getClass()
- );
- }
+ SpectatorHistogram counts = innerAggregator.get(buffer, position);
+ innerAggregator.merge(counts, obj);
}
@Override
public Object get(final ByteBuffer buffer, final int position)
{
- // histogramCache is an IdentityHashMap where the reference of buffer is
used for equality checks.
- // So the returned object isn't impacted by the changes in the buffer
object made by concurrent threads.
-
- SpectatorHistogram spectatorHistogram =
histogramCache.get(buffer).get(position);
- if (spectatorHistogram.isEmpty()) {
+ SpectatorHistogram histo = innerAggregator.get(buffer, position);
Review Comment:
It looks like `innerAggregator.get()` can return null, in which case the
next line, `if (histo.isEmpty()) {` might NPE.
##########
extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelperTest.java:
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.netflix.spectator.api.histogram.PercentileBuckets;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class SpectatorHistogramAggregateHelperTest extends
InitializedNullHandlingTest
+{
+ private static final int POSITION = 0;
+ private static final int POSITION_2 = 1;
+
+ private SpectatorHistogramAggregateHelper helper;
+ private ByteBuffer buffer;
+
+ @Before
+ public void setUp()
+ {
+ helper = new SpectatorHistogramAggregateHelper();
+ buffer = ByteBuffer.allocate(1024);
+ }
+
+ @After
+ public void tearDown()
+ {
+ helper.close();
+ }
+
+ @Test
+ public void testInitCreatesEmptyHistogram()
+ {
+ helper.init(buffer, POSITION);
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testMergeWithSpectatorHistogramObject()
+ {
+ SpectatorHistogram other = new SpectatorHistogram();
+ other.add(PercentileBuckets.indexOf(100), 5L);
+ other.add(PercentileBuckets.indexOf(200), 3L);
+
+ helper.init(buffer, POSITION);
+ SpectatorHistogram histogram = helper.get(buffer, POSITION);
+ helper.merge(histogram, other);
+
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(other, result);
+ }
+
+ @Test
+ public void testMergeWithLongValues()
+ {
+ helper.init(buffer, POSITION);
+ SpectatorHistogram histogram = helper.get(buffer, POSITION);
+ helper.merge(histogram, 100L);
+ helper.merge(histogram, 200L);
+ helper.merge(histogram, 100L);
+
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.insert(100L);
+ expected.insert(200L);
+ expected.insert(100L);
+
+ SpectatorHistogram result = helper.get(buffer, POSITION);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMergeWithSpectatorHistogram()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(100), 5L);
+
+ SpectatorHistogram other = new SpectatorHistogram();
+ other.add(PercentileBuckets.indexOf(100), 3L);
+ other.add(PercentileBuckets.indexOf(200), 2L);
+
+ helper.merge(histogram, other);
+
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(100), 8L);
+ expected.add(PercentileBuckets.indexOf(200), 2L);
+
+ Assert.assertEquals(expected, histogram);
+ }
+
+ @Test
+ public void testMergeWithNumber()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+
+ helper.merge(histogram, 100);
+ helper.merge(histogram, 200L);
+ helper.merge(histogram, 300.0);
Review Comment:
I think this `300.0` is implicitly being cast to Number? Might be clearer as
`Float.valueOf(300)`.
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregateHelper.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.java.util.common.IAE;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+
+/**
+ * Helper class for Spectator histogram implementations of {@link
org.apache.druid.query.aggregation.BufferAggregator} and {@link
org.apache.druid.query.aggregation.VectorAggregator}.
+ */
+public class SpectatorHistogramAggregateHelper
+{
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>>
histogramCache = new IdentityHashMap<>();
+
+ public void init(ByteBuffer buffer, int position)
+ {
+ SpectatorHistogram emptyCounts = new SpectatorHistogram();
+ addToCache(buffer, position, emptyCounts);
+ }
+
+ /**
+ * Merge obj ({@link SpectatorHistogram} or {@link Object}) into {@param
current}.
Review Comment:
nit: SpectatorHistogram `or Number`
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/*
+ Aggregator used during ingestion time when aggregating against numeric
values.
+*/
+public class SpectatorHistogramNumericVectorizedAggregator implements
VectorAggregator
+{
+ private final SpectatorHistogramAggregateHelper innerAggregator = new
SpectatorHistogramAggregateHelper();
+ private final VectorValueSelector selector;
+
+ public SpectatorHistogramNumericVectorizedAggregator(VectorValueSelector
selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(ByteBuffer buffer, int position)
+ {
+ // Map buf to a new histogram
+ innerAggregator.init(buffer, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buffer, int position, int startRow, int
endRow)
+ {
+ boolean[] isNull = selector.getNullVector();
+ long[] vector = selector.getLongVector();
+ boolean hasNulls = isNull != null;
+ final SpectatorHistogram histogram = innerAggregator.get(buffer, position);
+
+ for (int i = startRow; i < endRow; ++i) {
+ if (!hasNulls || !isNull[i]) {
+ innerAggregator.merge(histogram, vector[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buffer, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
+ {
+ boolean[] isNull = selector.getNullVector();
+ long[] vector = selector.getLongVector();
+ boolean hasNulls = isNull != null;
+
+ final Int2ObjectMap<SpectatorHistogram> histMap =
innerAggregator.get(buffer);
+ for (int i = 0; i < numRows; ++i) {
+ int rowIndex = rows != null ? rows[i] : i;
+ if (!hasNulls || !isNull[rowIndex]) {
+ int position = positions[i] + positionOffset;
+ innerAggregator.merge(histMap.get(position), vector[rowIndex]);
Review Comment:
perf: Are we likely in practice to see long runs where `position` ends up
loading the same map repeatedly? Not sure if worth caching this or how likely
this occurrence is.
##########
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramNumericVectorizedAggregator.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/*
+ Aggregator used during ingestion time when aggregating against numeric
values.
+*/
+public class SpectatorHistogramNumericVectorizedAggregator implements
VectorAggregator
+{
+ private final SpectatorHistogramAggregateHelper innerAggregator = new
SpectatorHistogramAggregateHelper();
+ private final VectorValueSelector selector;
+
+ public SpectatorHistogramNumericVectorizedAggregator(VectorValueSelector
selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(ByteBuffer buffer, int position)
+ {
+ // Map buf to a new histogram
+ innerAggregator.init(buffer, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buffer, int position, int startRow, int
endRow)
+ {
+ boolean[] isNull = selector.getNullVector();
+ long[] vector = selector.getLongVector();
+ boolean hasNulls = isNull != null;
+ final SpectatorHistogram histogram = innerAggregator.get(buffer, position);
+
+ for (int i = startRow; i < endRow; ++i) {
+ if (!hasNulls || !isNull[i]) {
Review Comment:
style: personally I think this (and other examples) would be more readable as
```
boolean rowIsNull = hasNulls && isNull[i];
if (!rowIsNull) {
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]