a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501099215
##########
File path:
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
.withMinTimestamp(granTimeStart.getMillis())
.build();
+
+ AppendableIndexBuilder indexBuilder;
Review comment:
Can this be final?
##########
File path:
processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule
Review comment:
Could you please add a test for this module?
##########
File path:
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT ==
null
?
DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT
: maxRowsInMemory;
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
// initializing this to 0, it will be lazily initialized to a value
// @see
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
Review comment:
Comment needs to be updated
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ?
DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily intialized to a value
// @see
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
Review comment:
Comment needs to be modified.
##########
File path:
server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
##########
@@ -132,6 +136,7 @@ public RealtimeTuningConfig(
@JsonProperty("dedupColumn") @Nullable String dedupColumn
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ?
DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
Review comment:
This comment is no longer valid. Please update accordingly.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
##########
@@ -17,25 +17,16 @@
* under the License.
*/
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+public interface AppendableIndexSpec
Review comment:
Please add javadocs.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
}
}
- public static class Builder
+ /**
+ * This class exists only as backward competability to reduce the number of
modified lines.
+ */
+ public static class Builder extends OnheapIncrementalIndex.Builder
{
- @Nullable
- private IncrementalIndexSchema incrementalIndexSchema;
- private boolean deserializeComplexMetrics;
- private boolean concurrentEventAdd;
- private boolean sortFacts;
- private int maxRowCount;
- private long maxBytesInMemory;
-
- public Builder()
- {
- incrementalIndexSchema = null;
- deserializeComplexMetrics = true;
- concurrentEventAdd = false;
- sortFacts = true;
- maxRowCount = 0;
- maxBytesInMemory = 0;
- }
-
+ @Override
public Builder setIndexSchema(final IncrementalIndexSchema
incrementalIndexSchema)
{
- this.incrementalIndexSchema = incrementalIndexSchema;
- return this;
+ return (Builder) super.setIndexSchema(incrementalIndexSchema);
}
- /**
- * A helper method to set a simple index schema with only metrics and
default values for the other parameters. Note
- * that this method is normally used for testing and benchmarking; it is
unlikely that you would use it in
- * production settings.
- *
- * @param metrics variable array of {@link AggregatorFactory} metrics
- *
- * @return this
- */
- @VisibleForTesting
+ @Override
public Builder setSimpleTestingIndexSchema(final AggregatorFactory...
metrics)
{
- return setSimpleTestingIndexSchema(null, metrics);
+ return (Builder) super.setSimpleTestingIndexSchema(metrics);
}
-
- /**
- * A helper method to set a simple index schema with controllable metrics
and rollup, and default values for the
- * other parameters. Note that this method is normally used for testing
and benchmarking; it is unlikely that you
- * would use it in production settings.
- *
- * @param metrics variable array of {@link AggregatorFactory} metrics
- *
- * @return this
- */
- @VisibleForTesting
+ @Override
public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final
AggregatorFactory... metrics)
{
- IncrementalIndexSchema.Builder builder = new
IncrementalIndexSchema.Builder().withMetrics(metrics);
- this.incrementalIndexSchema = rollup != null ?
builder.withRollup(rollup).build() : builder.build();
- return this;
+ return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
}
+ @Override
public Builder setDeserializeComplexMetrics(final boolean
deserializeComplexMetrics)
{
- this.deserializeComplexMetrics = deserializeComplexMetrics;
- return this;
+ return (Builder)
super.setDeserializeComplexMetrics(deserializeComplexMetrics);
}
+ @Override
public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
- this.concurrentEventAdd = concurrentEventAdd;
- return this;
+ return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
}
+ @Override
public Builder setSortFacts(final boolean sortFacts)
{
- this.sortFacts = sortFacts;
- return this;
+ return (Builder) super.setSortFacts(sortFacts);
}
+ @Override
public Builder setMaxRowCount(final int maxRowCount)
{
- this.maxRowCount = maxRowCount;
- return this;
+ return (Builder) super.setMaxRowCount(maxRowCount);
}
- //maxBytesInMemory only applies to OnHeapIncrementalIndex
+ @Override
public Builder setMaxBytesInMemory(final long maxBytesInMemory)
{
- this.maxBytesInMemory = maxBytesInMemory;
- return this;
+ return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
}
public OnheapIncrementalIndex buildOnheap()
Review comment:
Do we still need this method?
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
}
aggBuffers.clear();
}
+
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Nullable
+ NonBlockingPool<ByteBuffer> bufferPool = null;
+
+ public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ return this;
+ }
+
+ @Override
+ public void validate()
+ {
+ super.validate();
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("bufferPool cannot be null");
+ }
+ }
+
+ @Override
+ protected OffheapIncrementalIndex buildInner()
+ {
+ return new OffheapIncrementalIndex(
+ Objects.requireNonNull(incrementalIndexSchema,
"incrementalIndexSchema is null"),
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ sortFacts,
+ maxRowCount,
+ Objects.requireNonNull(bufferPool, "bufferPool is null")
Review comment:
Isn't validate() already checking this?
##########
File path:
server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
public interface TuningConfig
{
boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+ AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new
OnheapIncrementalIndex.Spec();
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
- // We initially estimated this to be 1/3(max jvm memory), but
bytesCurrentlyInMemory only
- // tracks active index and not the index being flushed to disk, to account
for that
- // we halved default to 1/6(max jvm memory)
- long DEFAULT_MAX_BYTES_IN_MEMORY =
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+ /**
+ * The inceremental index implementation to use
+ */
+ AppendableIndexSpec getAppendableIndexSpec();
+
+ /**
+ * Maximum number of bytes (estimated) to store in memory before persisting
to local storage
+ */
+ long getMaxBytesInMemory();
+
+ /**
+ * Maximum number of bytes (estimated) to store in memory before persisting
to local storage.
+ * If getMaxBytesInMemory() returns 0, the appendable index default will be
used.
+ */
+ default long getMaxBytesInMemoryOrDefault()
+ {
+ // In the main tuningConfig class constructor, we set the maxBytes to 0 if
null to avoid setting
+ // maxBytes to max jvm memory of the process that starts first. Instead we
set the default based on
+ // the actual task node's jvm memory.
+ final long maxBytesInMemory = getMaxBytesInMemory();
+ if (maxBytesInMemory > 0) {
+ return maxBytesInMemory;
+ } else if (maxBytesInMemory == 0) {
+ return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+ } else {
+ return Long.MAX_VALUE;
+ }
+ }
+
+ PartitionsSpec getPartitionsSpec();
+
+ IndexSpec getIndexSpec();
+
+ IndexSpec getIndexSpecForIntermediatePersists();
Review comment:
Why we do we need to move these out of AppenderatorConfig?
##########
File path:
server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
public interface TuningConfig
{
boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+ AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new
OnheapIncrementalIndex.Spec();
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
- // We initially estimated this to be 1/3(max jvm memory), but
bytesCurrentlyInMemory only
- // tracks active index and not the index being flushed to disk, to account
for that
- // we halved default to 1/6(max jvm memory)
- long DEFAULT_MAX_BYTES_IN_MEMORY =
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+ /**
+ * The inceremental index implementation to use
Review comment:
nit: typo for incremental
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
@Nullable Integer maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ?
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
Review comment:
Comment needs to be modified.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+ @Nullable
+ protected IncrementalIndexSchema incrementalIndexSchema = null;
+ protected boolean deserializeComplexMetrics = true;
+ protected boolean concurrentEventAdd = false;
+ protected boolean sortFacts = true;
+ protected int maxRowCount = 0;
+ protected long maxBytesInMemory = 0;
+
+ protected final Logger log = new Logger(this.getClass().getName());
+
+ public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema
incrementalIndexSchema)
+ {
+ this.incrementalIndexSchema = incrementalIndexSchema;
+ return this;
+ }
+
+ /**
+ * A helper method to set a simple index schema with only metrics and
default values for the other parameters. Note
+ * that this method is normally used for testing and benchmarking; it is
unlikely that you would use it in
+ * production settings.
+ *
+ * @param metrics variable array of {@link AggregatorFactory} metrics
+ *
+ * @return this
+ */
+ @VisibleForTesting
+ public AppendableIndexBuilder setSimpleTestingIndexSchema(final
AggregatorFactory... metrics)
+ {
+ return setSimpleTestingIndexSchema(null, metrics);
+ }
+
+
+ /**
+ * A helper method to set a simple index schema with controllable metrics
and rollup, and default values for the
+ * other parameters. Note that this method is normally used for testing and
benchmarking; it is unlikely that you
+ * would use it in production settings.
+ *
+ * @param metrics variable array of {@link AggregatorFactory} metrics
+ *
+ * @return this
+ */
+ @VisibleForTesting
+ public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean
rollup, final AggregatorFactory... metrics)
+ {
+ IncrementalIndexSchema.Builder builder = new
IncrementalIndexSchema.Builder().withMetrics(metrics);
+ this.incrementalIndexSchema = rollup != null ?
builder.withRollup(rollup).build() : builder.build();
+ return this;
+ }
+
+ public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean
deserializeComplexMetrics)
+ {
+ this.deserializeComplexMetrics = deserializeComplexMetrics;
+ return this;
+ }
+
+ public AppendableIndexBuilder setConcurrentEventAdd(final boolean
concurrentEventAdd)
+ {
+ this.concurrentEventAdd = concurrentEventAdd;
+ return this;
+ }
+
+ public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+ {
+ this.sortFacts = sortFacts;
+ return this;
+ }
+
+ public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+ {
+ this.maxRowCount = maxRowCount;
+ return this;
+ }
+
+ public AppendableIndexBuilder setMaxBytesInMemory(final long
maxBytesInMemory)
+ {
+ this.maxBytesInMemory = maxBytesInMemory;
+ return this;
+ }
+
+ public void validate()
+ {
+ if (maxRowCount <= 0) {
+ throw new IllegalArgumentException("Invalid max row count: " +
maxRowCount);
+ }
+
+ if (incrementalIndexSchema == null) {
+ throw new IllegalArgumentException("incrementIndexSchema cannot be
null");
+ }
+ }
+
+ public final IncrementalIndex build()
Review comment:
buildInner should be okay.
It might be better to switch the log level to DEBUG though.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
}
aggBuffers.clear();
}
+
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Nullable
+ NonBlockingPool<ByteBuffer> bufferPool = null;
+
+ public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ return this;
+ }
+
+ @Override
+ public void validate()
+ {
+ super.validate();
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("bufferPool cannot be null");
+ }
+ }
+
+ @Override
+ protected OffheapIncrementalIndex buildInner()
+ {
+ return new OffheapIncrementalIndex(
+ Objects.requireNonNull(incrementalIndexSchema,
"incrementalIndexSchema is null"),
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ sortFacts,
+ maxRowCount,
+ Objects.requireNonNull(bufferPool, "bufferPool is null")
+ );
+ }
+ }
+
+ public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+ {
+ public static final String TYPE = "offheap";
+ static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+ static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+ final int bufferSize;
+ final int cacheSize;
+ final NonBlockingPool<ByteBuffer> bufferPool;
+
+ @JsonCreator
+ public Spec(
+ final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+ final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+ )
+ {
+ this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize :
DEFAULT_BUFFER_SIZE;
+ this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ?
cacheSize : DEFAULT_CACHE_SIZE;
+ this.bufferPool = new StupidPool<>(
+ "Offheap incremental-index buffer pool",
+ this,
+ 0,
+ this.cacheSize / this.bufferSize
Review comment:
The changes in this class apart from conforming to the
AppendableIndexSpec interface might be out of scope of this PR and it might
require its own additional tests. Would it be make sense to propose these
changes in an incremental PR once the AppendableIndexSpec changes are merged?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]