Eshcar commented on a change in pull request #10593:
URL: https://github.com/apache/druid/pull/10593#discussion_r532214571



##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
##########
@@ -227,10 +228,10 @@ public void tearDown() throws IOException
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()

Review comment:
       shouldn't this method take a parameter to decide which type of index to 
return? 
   or is this the default builder?
   then maybe buildDefaultIncIndex and the default should be some hard coded 
value that can be changed over time

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
##########
@@ -205,30 +184,106 @@ public void setup() throws IOException
 
     GeneratorSchemaInfo basicSchema = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
     QuerySegmentSpec intervalSpec = new 
MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
-    List<AggregatorFactory> queryAggs = new ArrayList<>();
-    queryAggs.add(filteredMetrics[0]);
+    List<AggregatorFactory> queryAggs = 
Collections.singletonList(filteredMetric);
 
     query = Druids.newTimeseriesQueryBuilder()
                   .dataSource("blah")
                   .granularity(Granularities.ALL)
                   .intervals(intervalSpec)
                   .aggregators(queryAggs)
-                  .descending(false)
+                  .descending(descending)
                   .build();
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws 
JsonProcessingException
+    {
+      global.appendableIndexSpec = 
IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      incIndex.close();
+    }
+  }
+
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexIngestState
   {
-    FileUtils.deleteDirectory(tmpDir);

Review comment:
       the diff here is very misleading - this line is part of a one line 
method tearDown that was deleted

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
##########
@@ -205,30 +184,106 @@ public void setup() throws IOException
 
     GeneratorSchemaInfo basicSchema = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
     QuerySegmentSpec intervalSpec = new 
MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
-    List<AggregatorFactory> queryAggs = new ArrayList<>();
-    queryAggs.add(filteredMetrics[0]);
+    List<AggregatorFactory> queryAggs = 
Collections.singletonList(filteredMetric);
 
     query = Druids.newTimeseriesQueryBuilder()
                   .dataSource("blah")
                   .granularity(Granularities.ALL)
                   .intervals(intervalSpec)
                   .aggregators(queryAggs)
-                  .descending(false)
+                  .descending(descending)
                   .build();
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;

Review comment:
       since now there is a new extension point for incremental index, 
shouldn't the type be extendable as well? 
   use enum instead of string and names like defaultOnHeap and OakOffHeap so 
additional on/off-heap implementations can be added in the future 

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
##########
@@ -205,30 +184,106 @@ public void setup() throws IOException
 
     GeneratorSchemaInfo basicSchema = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
     QuerySegmentSpec intervalSpec = new 
MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
-    List<AggregatorFactory> queryAggs = new ArrayList<>();
-    queryAggs.add(filteredMetrics[0]);
+    List<AggregatorFactory> queryAggs = 
Collections.singletonList(filteredMetric);
 
     query = Druids.newTimeseriesQueryBuilder()
                   .dataSource("blah")
                   .granularity(Granularities.ALL)
                   .intervals(intervalSpec)
                   .aggregators(queryAggs)
-                  .descending(false)
+                  .descending(descending)
                   .build();
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws 
JsonProcessingException
+    {
+      global.appendableIndexSpec = 
IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      incIndex.close();
+    }
+  }
+
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexIngestState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+    List<InputRow> inputRows;
+
+    @Setup(Level.Invocation)
+    public void setup(FilteredAggregatorBenchmark global) throws 
JsonProcessingException
+    {
+      global.appendableIndexSpec = 
IncrementalIndexCreator.parseIndexType(indexType);
+      inputRows = global.generator.toList(global.rowsPerSegment);
+      incIndex = global.makeIncIndex(new 
AggregatorFactory[]{global.filteredMetric});
+    }
+
+    @TearDown(Level.Invocation)
+    public void tearDown()
+    {
+      incIndex.close();
+    }
+  }
+
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
+  {
+    private File qIndexesDir;
+    private QueryableIndex qIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      IncrementalIndex<?> incIndex = 
global.makeIncIndex(global.schemaInfo.getAggsArray());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+
+      qIndexesDir = FileUtils.createTempDir();
+      log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());
+
+      File indexFile = INDEX_MERGER_V9.persist(
+          incIndex,
+          qIndexesDir,
+          new IndexSpec(),
+          null
+      );
+      incIndex.close();
+
+      qIndex = INDEX_IO.loadIndex(indexFile);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      qIndex.close();

Review comment:
       no option for qIndex to be null? e.g., if indexFile is empty?

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
##########
@@ -149,39 +138,44 @@ public void teardown()
     incIndex = null;
   }
 
-  private IncrementalIndex makeIncIndex()
+  @Setup(Level.Invocation)
+  public void setupTemp()
+  {
+    tmpDir = FileUtils.createTempDir();
+    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
+  }
+
+  @TearDown(Level.Invocation)
+  public void teardownTemp() throws IOException
   {
-    return new IncrementalIndex.Builder()
+    FileUtils.deleteDirectory(tmpDir);

Review comment:
       nice handling of the file

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
##########
@@ -428,14 +424,12 @@ private void setupQueries()
   }
 
   @Setup(Level.Trial)
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + +System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numProcessingThreads, 
"GroupByThreadPool[%d]");

Review comment:
       from this point onward a bit hard to follow the reasoning for the 
changes? what part of the PR description does this relate to? 

##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -150,18 +150,13 @@ protected AddToFactsResult addToFacts(
       boolean skipMaxRowsInMemoryCheck // ignored, we always want to check 
this for offheap
   ) throws IndexSizeExceededException
   {
-    ByteBuffer aggBuffer;
-    int bufferIndex;
-    int bufferOffset;
-
     synchronized (this) {
       final AggregatorFactory[] metrics = getMetrics();
       final int priorIndex = facts.getPriorIndex(key);
       if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
         final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
-        bufferIndex = indexAndOffset[0];
-        bufferOffset = indexAndOffset[1];
-        aggBuffer = aggBuffers.get(bufferIndex).get();
+        ByteBuffer aggBuffer = aggBuffers.get(indexAndOffset[0]).get();

Review comment:
       what's the reasoning for these changes? add documentation to explain

##########
File path: 
processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Since the off-heap incremental index is not yet supported in production 
ingestion, we define its spec here only

Review comment:
       add  a more general documentation of the role of this class for the time 
it is supported

##########
File path: 
processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
##########
@@ -321,10 +322,10 @@ private static QueryableIndex buildIndex(String 
storeDoubleAsFloat) throws IOExc
         )
         .build();
 
-    final IncrementalIndex index = new IncrementalIndex.Builder()

Review comment:
       from here on forward same 5 lines changes repeat for different tests

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
##########
@@ -205,30 +184,106 @@ public void setup() throws IOException
 
     GeneratorSchemaInfo basicSchema = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
     QuerySegmentSpec intervalSpec = new 
MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
-    List<AggregatorFactory> queryAggs = new ArrayList<>();
-    queryAggs.add(filteredMetrics[0]);
+    List<AggregatorFactory> queryAggs = 
Collections.singletonList(filteredMetric);
 
     query = Druids.newTimeseriesQueryBuilder()
                   .dataSource("blah")
                   .granularity(Granularities.ALL)
                   .intervals(intervalSpec)
                   .aggregators(queryAggs)
-                  .descending(false)
+                  .descending(descending)
                   .build();
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws 
JsonProcessingException
+    {
+      global.appendableIndexSpec = 
IncrementalIndexCreator.parseIndexType(indexType);

Review comment:
       worth mentioning that this is where the type of the index is set in the 
spec, and later used by the factory

##########
File path: 
processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
##########
@@ -268,7 +217,7 @@ private static MapBasedInputRow getLongRow(long timestamp, 
int dimensionCount)
   public void testCaseSensitivity() throws Exception
   {
     long timestamp = System.currentTimeMillis();
-    IncrementalIndex index = 
closerRule.closeLater(indexCreator.createIndex(DEFAULT_AGGREGATOR_FACTORIES));
+    IncrementalIndex<?> index = indexCreator.createIndex((Object) 
DEFAULT_AGGREGATOR_FACTORIES);

Review comment:
       all changes from here are due to the generic type? 

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
##########
@@ -124,46 +131,51 @@ private MapBasedInputRow getStringRow(long timestamp, int 
dimensionCount)
     return new MapBasedInputRow(timestamp, dimensionList, builder.build());
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(aggs)
         .setDeserializeComplexMetrics(false)
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+        .setMaxRowCount(rowsPerSegment)
+        .build();
   }
 
   @Setup
-  public void setup()
+  public void setup() throws JsonProcessingException
   {
-    for (int i = 0; i < MAX_ROWS; i++) {
+    appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
+    for (int i = 0; i < rowsPerSegment; i++) {
       longRows.add(getLongRow(0, DIMENSION_COUNT));
     }
 
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       floatRows.add(getFloatRow(0, DIMENSION_COUNT));
     }
 
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       stringRows.add(getStringRow(0, DIMENSION_COUNT));
     }
   }
 
-  @Setup(Level.Iteration)
+  @Setup(Level.Invocation)
   public void setup2()
   {
     incIndex = makeIncIndex();
-    incFloatIndex = makeIncIndex();

Review comment:
       Does adding all rows into one index equivalent to having 3 indices?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to