Eshcar commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r499506440
##########
File path:
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
public void testConvert()
{
KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+ null,
Review comment:
a way to avoid the null (in multiple places) is to add another ctor that
takes additional param while supporting prev ctor
##########
File path:
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT ==
null
?
DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT
: maxRowsInMemory;
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
Review comment:
add comment about this being the line that sets the (configurable) I2
type
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -210,8 +211,8 @@ private InputSourceReader buildReader(
.withRollup(dataSchema.getGranularitySpec().isRollup())
.build();
- return new IncrementalIndex.Builder().setIndexSchema(schema)
+ return (OnheapIncrementalIndex) new
OnheapIncrementalIndex.Builder().setIndexSchema(schema)
Review comment:
is this down-casting required?
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
boolean resetOffsetAutomatically = false;
int maxRowsInMemory = 1000;
final KinesisIndexTaskTuningConfig tuningConfig = new
KinesisIndexTaskTuningConfig(
+ null,
Review comment:
likewise null parameter
##########
File path:
indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();
- IncrementalIndex newIndex = new IncrementalIndex.Builder()
+ IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
Review comment:
add comment: this is the line that makes I2 configurable
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
@Nullable Integer maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
Review comment:
likewise duplicate code
##########
File path:
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
##########
@@ -193,7 +196,7 @@ public String toString()
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
- ", maxBytesInMemory=" +
TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
+ ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
Review comment:
nice. Looks like a change in the right direction
##########
File path:
extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws
IOException
}
+ @Test
Review comment:
How is this test related to configurable index type?
Please add documentation line explaining the test
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
Review comment:
having this code duplicated is weird
Shouldn't all these XXXTuningConfig have some common parent with the shared
code?
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,129 +316,6 @@ protected IncrementalIndex(
}
}
- public static class Builder
Review comment:
so this static class has now changed to an abstract class with different
concrete builder classes - nice
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
new
OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory,
concurrentEventAdd)
);
- if (i == 0) {
- aggOffsetInBuffer[i] = 0;
- } else {
- aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i -
1].getMaxIntermediateSizeWithNulls();
- }
+ aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
Review comment:
nice - simpler
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
}
aggBuffers.clear();
}
+
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Nullable
+ NonBlockingPool<ByteBuffer> bufferPool = null;
+
+ public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ return this;
+ }
+
+ @Override
+ public void validate()
+ {
+ super.validate();
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("bufferPool cannot be null");
+ }
+ }
+
+ @Override
+ protected OffheapIncrementalIndex buildInner()
Review comment:
this method should return IncrementalIndex
Is this legal to change the signature ?
##########
File path:
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
.withMinTimestamp(granTimeStart.getMillis())
.build();
+
+ AppendableIndexBuilder indexBuilder;
+
if (query.getContextValue("useOffheap", false)) {
- index = new IncrementalIndex.Builder()
- .setIndexSchema(indexSchema)
- .setDeserializeComplexMetrics(false)
- .setConcurrentEventAdd(true)
- .setSortFacts(sortResults)
- .setMaxRowCount(querySpecificConfig.getMaxResults())
- .buildOffheap(bufferPool);
+ indexBuilder = new OffheapIncrementalIndex.Builder()
+ .setBufferPool(bufferPool);
} else {
- index = new IncrementalIndex.Builder()
- .setIndexSchema(indexSchema)
- .setDeserializeComplexMetrics(false)
- .setConcurrentEventAdd(true)
- .setSortFacts(sortResults)
- .setMaxRowCount(querySpecificConfig.getMaxResults())
- .buildOnheap();
+ indexBuilder = new OnheapIncrementalIndex.Builder();
}
+ index = indexBuilder
Review comment:
great. reducing duplication
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -434,4 +436,39 @@ public ColumnCapabilities getColumnCapabilities(String
columnName)
}
}
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Override
+ protected OnheapIncrementalIndex buildInner()
Review comment:
likewise - diff signature than parent class
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
new
OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory,
concurrentEventAdd)
);
- if (i == 0) {
- aggOffsetInBuffer[i] = 0;
- } else {
- aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i -
1].getMaxIntermediateSizeWithNulls();
- }
+ aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+ aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
}
- aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] +
metrics[metrics.length
- -
1].getMaxIntermediateSizeWithNulls();
+ aggsTotalSize = aggsCurOffsetInBuffer;
Review comment:
why is this correct?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]