liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501517293



##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, 
"incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+  {
+    public static final String TYPE = "offheap";
+    static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+    static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+    final int bufferSize;
+    final int cacheSize;
+    final NonBlockingPool<ByteBuffer> bufferPool;
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+        final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+    )
+    {
+      this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : 
DEFAULT_BUFFER_SIZE;
+      this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? 
cacheSize : DEFAULT_CACHE_SIZE;
+      this.bufferPool = new StupidPool<>(
+          "Offheap incremental-index buffer pool",
+          this,
+          0,
+          this.cacheSize / this.bufferSize

Review comment:
       Removed

##########
File path: 
server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new 
OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but 
bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account 
for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = 
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use

Review comment:
       Fixed

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? 
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see 
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed

##########
File path: 
server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
##########
@@ -132,6 +136,7 @@ public RealtimeTuningConfig(
       @JsonProperty("dedupColumn") @Nullable String dedupColumn
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? 
DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see 
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed

##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema 
incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and 
default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is 
unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final 
AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics 
and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and 
benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean 
rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new 
IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? 
builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean 
deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean 
concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long 
maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + 
maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be 
null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       Changed to DEBUG.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of 
modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema 
incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and 
default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is 
unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... 
metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics 
and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing 
and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final 
AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new 
IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? 
builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean 
deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) 
super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
        After reading your comment about the off-heap builder, I think it is 
inevitable to leave this builder and add back the `buildOffheap()` method as 
well.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
##########
@@ -17,25 +17,16 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
 
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+public interface AppendableIndexSpec

Review comment:
       Added

##########
File path: 
processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.

##########
File path: 
extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws 
IOException
 
   }
 
+  @Test

Review comment:
       Added

##########
File path: 
processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.

##########
File path: 
processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       This is tested as part of the separate `TuningConfig` tests:
   * `HadoopTuningConfigTest`
   * `ParallelIndexTuningConfigTest`
   * `KafkaIndexTuningConfigTest`
   * `KafkaSupervisorTuningConfigTest`
   * `KinesisIndexTaskTuningConfigTest`
   * `KinesisSupervisorTuningConfigTest`
   * `RealtimeTuningConfigTest`

##########
File path: 
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == 
null
                                                       ? 
DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT 
: maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I'm unsure why this line needs to be documented. It is no different from 
any of the other tuning configurations in this constructor.
   `AppendableIndexSpec` is documented, so it is self-explanatory.

##########
File path: 
server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new 
OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but 
bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account 
for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = 
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting 
to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting 
to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be 
used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if 
null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we 
set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring 
HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could 
be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it 
before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the 
`TuningConfig` interface refactoring.

##########
File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,

Review comment:
       I think it is best to avoid a new constructor here.
   Otherwise, each new parameter will incur a new constructor.

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new 
KinesisIndexTaskTuningConfig(
+        null,

Review comment:
       See above.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I agree. But that will require a refactor to unify some of the 
`TuningConfig` implementations to one common (abstract?) implementation.
   This can be done before or after this PR.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       See above.

##########
File path: 
indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
         
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()

Review comment:
       Done

##########
File path: 
server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new 
OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but 
bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account 
for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = 
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting 
to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting 
to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be 
used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if 
null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we 
set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring 
HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could 
be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it 
before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the 
`TuningConfig` interface refactoring.
   
   EDIT: I already applied your comments there.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of 
modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema 
incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and 
default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is 
unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... 
metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics 
and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing 
and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final 
AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new 
IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? 
builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean 
deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) 
super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Done. See #10494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to