This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 90de985c10d remove useMaxMemoryEstimates in favor of always using the
false behavior which uses the newer more accurate estimates (#17936)
90de985c10d is described below
commit 90de985c10de379c655f07a2cbe7d259f68866c7
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Apr 21 09:19:15 2025 -0700
remove useMaxMemoryEstimates in favor of always using the false behavior
which uses the newer more accurate estimates (#17936)
* remove useMaxMemoryEstimates in favor of always using the false behavior
which uses the newer more accurate estimates
* fixes
* fix test
* fix style
---
.../indexing/StringDimensionIndexerBenchmark.java | 2 +-
.../StringDimensionIndexerProcessBenchmark.java | 4 +-
.../MaterializedViewSupervisorSpec.java | 1 -
.../SegmentGeneratorFrameProcessorFactory.java | 1 -
.../apache/druid/indexer/HadoopTuningConfig.java | 13 --
.../apache/druid/indexer/IndexGeneratorJob.java | 1 -
.../druid/indexer/BatchDeltaIngestionTest.java | 1 -
.../indexer/DetermineHashedPartitionsJobTest.java | 3 +-
.../druid/indexer/DeterminePartitionsJobTest.java | 1 -
.../indexer/DetermineRangePartitionsJobTest.java | 2 +-
.../indexer/HadoopDruidIndexerConfigTest.java | 1 -
.../druid/indexer/HadoopTuningConfigTest.java | 1 -
.../druid/indexer/IndexGeneratorJobTest.java | 1 -
.../org/apache/druid/indexer/JobHelperTest.java | 1 -
.../indexer/path/GranularityPathSpecTest.java | 1 -
.../druid/indexing/common/task/AbstractTask.java | 11 --
.../indexing/common/task/BatchAppenderators.java | 10 +-
.../druid/indexing/common/task/IndexTask.java | 3 +-
.../apache/druid/indexing/common/task/Tasks.java | 11 --
.../batch/parallel/PartialSegmentGenerateTask.java | 8 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 7 +-
.../seekablestream/SeekableStreamIndexTask.java | 1 -
.../common/task/TestAppenderatorsManager.java | 4 -
.../apache/druid/segment/DimensionDictionary.java | 20 +--
.../org/apache/druid/segment/DimensionHandler.java | 4 +-
.../druid/segment/DoubleDimensionHandler.java | 2 +-
.../druid/segment/FloatDimensionHandler.java | 2 +-
.../apache/druid/segment/LongDimensionHandler.java | 2 +-
.../segment/NestedCommonFormatColumnHandler.java | 2 +-
.../druid/segment/StringDimensionDictionary.java | 14 +--
.../druid/segment/StringDimensionHandler.java | 4 +-
.../druid/segment/StringDimensionIndexer.java | 20 +--
.../incremental/AppendableIndexBuilder.java | 7 --
.../segment/incremental/IncrementalIndex.java | 12 +-
.../incremental/OnHeapAggregateProjection.java | 37 ++----
.../incremental/OnheapIncrementalIndex.java | 134 +++------------------
.../druid/segment/StringDimensionIndexerTest.java | 82 -------------
.../incremental/IncrementalIndexRowSizeTest.java | 15 +--
.../realtime/appenderator/Appenderators.java | 4 -
.../appenderator/AppenderatorsManager.java | 2 -
.../realtime/appenderator/BatchAppenderator.java | 7 +-
.../DummyForInjectionAppenderatorsManager.java | 2 -
.../appenderator/PeonAppenderatorsManager.java | 4 -
.../realtime/appenderator/StreamAppenderator.java | 7 +-
.../UnifiedIndexerAppenderatorsManager.java | 4 -
.../apache/druid/segment/realtime/sink/Sink.java | 11 +-
.../appenderator/BatchAppenderatorTest.java | 30 ++---
.../appenderator/BatchAppenderatorTester.java | 1 -
.../appenderator/StreamAppenderatorTest.java | 40 +++---
.../appenderator/StreamAppenderatorTester.java | 2 -
.../UnifiedIndexerAppenderatorsManagerTest.java | 1 -
.../druid/segment/realtime/sink/SinkTest.java | 6 +-
52 files changed, 108 insertions(+), 459 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerBenchmark.java
index 00bc119b2f5..81ce34152d4 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerBenchmark.java
@@ -55,7 +55,7 @@ public class StringDimensionIndexerBenchmark
@Setup
public void setup()
{
- indexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false, true);
+ indexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false);
for (int i = 0; i < cardinality; i++) {
indexer.processRowValsToUnsortedEncodedKeyComponent("abcd-" + i, true);
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
index c328e6967f6..c5bb9cf717e 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
@@ -69,7 +69,7 @@ public class StringDimensionIndexerProcessBenchmark
inputData[i] = (next < nullNumbers) ? null : ("abcd-" + next + "-efgh");
}
- fullIndexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false, false);
+ fullIndexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false);
for (String data : inputData) {
fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
}
@@ -83,7 +83,7 @@ public class StringDimensionIndexerProcessBenchmark
@Setup(Level.Iteration)
public void setupEmptyIndexer()
{
- emptyIndexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false, false);
+ emptyIndexer = new
StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true,
false);
}
@Setup(Level.Iteration)
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index 6518dcc368d..b7b3249a8b9 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -183,7 +183,6 @@ public class MaterializedViewSupervisorSpec implements
SupervisorSpec
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
- tuningConfig.isUseMaxMemoryEstimates(),
tuningConfig.isLeaveIntermediate(),
tuningConfig.isCleanupOnFailure(),
tuningConfig.isOverwriteFiles(),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
index fed66e88955..5e783e6c6b4 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
@@ -194,7 +194,6 @@ public class SegmentGeneratorFrameProcessorFactory
frameContext.indexMerger(),
meters,
parseExceptionHandler,
- false,
// MSQ doesn't support CentralizedDatasourceSchema feature
as of now.
CentralizedDatasourceSchemaConfig.create(false)
);
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index 9da6ead38cf..32040fd3765 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -63,7 +63,6 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_MAX_ROWS_IN_MEMORY_BATCH,
0L,
false,
- false,
true,
false,
false,
@@ -92,7 +91,6 @@ public class HadoopTuningConfig implements TuningConfig
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
- private final boolean useMaxMemoryEstimates;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final boolean overwriteFiles;
@@ -129,7 +127,6 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
- final @JsonProperty("useMaxMemoryEstimates") @Nullable Boolean
useMaxMemoryEstimates,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") @Nullable Boolean
cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
@@ -163,7 +160,6 @@ public class HadoopTuningConfig implements TuningConfig
maxRowsInMemory,
Configs.valueOrDefault(maxRowsInMemoryCOMPAT,
DEFAULT_MAX_ROWS_IN_MEMORY_BATCH)
);
- this.useMaxMemoryEstimates = Configs.valueOrDefault(useMaxMemoryEstimates,
false);
this.appendableIndexSpec = Configs.valueOrDefault(appendableIndexSpec,
DEFAULT_APPENDABLE_INDEX);
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
@@ -266,12 +262,6 @@ public class HadoopTuningConfig implements TuningConfig
return maxBytesInMemory;
}
- @JsonProperty
- public boolean isUseMaxMemoryEstimates()
- {
- return useMaxMemoryEstimates;
- }
-
@JsonProperty
public boolean isLeaveIntermediate()
{
@@ -381,7 +371,6 @@ public class HadoopTuningConfig implements TuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
- useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@@ -414,7 +403,6 @@ public class HadoopTuningConfig implements TuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
- useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@@ -447,7 +435,6 @@ public class HadoopTuningConfig implements TuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
- useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index a41ced78e7c..9635471239c 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -311,7 +311,6 @@ public class IndexGeneratorJob implements Jobby
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getMaxRowsInMemory())
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
-
.setUseMaxMemoryEstimates(tuningConfig.isUseMaxMemoryEstimates())
.build();
if (oldDimOrder != null &&
!indexSchema.getDimensionsSpec().hasFixedDimensions()) {
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 65192d0a130..4ff97502866 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -479,7 +479,6 @@ public class BatchDeltaIngestionTest
false,
false,
false,
- false,
null,
false,
false,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
index fcdda375bc2..5eb27b98ae8 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
@@ -218,7 +218,6 @@ public class DetermineHashedPartitionsJobTest
false,
false,
false,
- false,
null,
false,
false,
@@ -247,7 +246,7 @@ public class DetermineHashedPartitionsJobTest
Map<Long, List<HadoopyShardSpec>> shardSpecs =
indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Assert.assertEquals(
expectedNumTimeBuckets,
- shardSpecs.entrySet().size()
+ shardSpecs.size()
);
int i = 0;
for (Map.Entry<Long, List<HadoopyShardSpec>> entry :
shardSpecs.entrySet()) {
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
index 3e604a7ef25..f57ce6bdd64 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
@@ -333,7 +333,6 @@ public class DeterminePartitionsJobTest
false,
false,
false,
- false,
null,
false,
false,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
index 881fc3a3833..9869ff46561 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
@@ -382,7 +382,7 @@ public class DetermineRangePartitionsJobTest
null,
null,
null,
- false, false,
+ false,
false,
false,
false,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
index 3178e21d4cc..1a0de3e2cb1 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -268,7 +268,6 @@ public class HadoopDruidIndexerConfigTest
false,
false,
false,
- false,
null,
false,
false,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
index 5857f92172f..97c764dd10b 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
@@ -48,7 +48,6 @@ public class HadoopTuningConfigTest
null,
100,
null,
- false,
true,
true,
true,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index 33f56f50400..4b4a7729dcf 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -534,7 +534,6 @@ public class IndexGeneratorJobTest
null,
maxRowsInMemory,
maxBytesInMemory,
- false,
true,
false,
false,
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
index 6559cd5faee..f2e49e3b605 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
@@ -177,7 +177,6 @@ public class JobHelperTest
false,
false,
false,
- false,
//Map of job properties
ImmutableMap.of(
"fs.s3.impl",
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
index 0d06d83f32d..7b7f2ca788a 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
@@ -67,7 +67,6 @@ public class GranularityPathSpecTest
false,
false,
false,
- false,
null,
false,
false,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 0c40f3b5287..a3331767ba9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -385,17 +385,6 @@ public abstract class AbstractTask implements Task
return context;
}
- /**
- * Whether maximum memory usage should be considered in estimation for
indexing tasks.
- */
- protected boolean isUseMaxMemoryEstimates()
- {
- return getContextValue(
- Tasks.USE_MAX_MEMORY_ESTIMATES,
- Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
- );
- }
-
protected ServiceMetricEvent.Builder getMetricBuilder()
{
return metricBuilder;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index 6af4402c35a..ae931e08286 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -42,8 +42,7 @@ public final class BatchAppenderators
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates
+ ParseExceptionHandler parseExceptionHandler
)
{
return newAppenderator(
@@ -55,8 +54,7 @@ public final class BatchAppenderators
appenderatorConfig,
toolbox.getSegmentPusher(),
rowIngestionMeters,
- parseExceptionHandler,
- useMaxMemoryEstimates
+ parseExceptionHandler
);
}
@@ -69,8 +67,7 @@ public final class BatchAppenderators
AppenderatorConfig appenderatorConfig,
DataSegmentPusher segmentPusher,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates
+ ParseExceptionHandler parseExceptionHandler
)
{
return appenderatorsManager.createBatchAppenderatorForTask(
@@ -84,7 +81,6 @@ public final class BatchAppenderators
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
toolbox.getCentralizedTableSchemaConfig()
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index cacf1c7f4f1..530d21e5f1d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -876,8 +876,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler, Pe
dataSchema,
tuningConfig,
buildSegmentsMeters,
- buildSegmentsParseExceptionHandler,
- isUseMaxMemoryEstimates()
+ buildSegmentsParseExceptionHandler
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver =
BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 5a1d7f1cac4..42bf34d7eaf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -52,17 +52,6 @@ public class Tasks
public static final String TASK_LOCK_TYPE = "taskLockType";
public static final String USE_CONCURRENT_LOCKS = "useConcurrentLocks";
- /**
- * Context flag denoting if maximum possible values should be used to
estimate
- * on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
- * more details.
- * <p>
- * @deprecated This flag will be removed in future Druid releases, and the
new
- * method of memory estimation will be used in all cases.
- */
- @Deprecated
- public static final String USE_MAX_MEMORY_ESTIMATES =
"useMaxMemoryEstimates";
-
/**
* Context flag to denote if segments published to metadata by a task should
* have the {@code lastCompactionState} field set.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index c27c112071f..9edc9ebb8cb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -33,7 +33,6 @@ import
org.apache.druid.indexing.common.task.InputSourceProcessor;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.input.WindowedSegmentId;
@@ -186,10 +185,6 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
- final boolean useMaxMemoryEstimates = getContextValue(
- Tasks.USE_MAX_MEMORY_ESTIMATES,
- Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
- );
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
@@ -199,8 +194,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
tuningConfig,
new ShuffleDataSegmentPusher(supervisorTaskId, getId(),
toolbox.getIntermediaryDataManager()),
buildSegmentsMeters,
- parseExceptionHandler,
- useMaxMemoryEstimates
+ parseExceptionHandler
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver =
BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 8a0a8d3e6fc..17ed802b86f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -391,10 +391,6 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
useLineageBasedSegmentAllocation
);
- final boolean useMaxMemoryEstimates = getContextValue(
- Tasks.USE_MAX_MEMORY_ESTIMATES,
- Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
- );
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
@@ -403,8 +399,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
dataSchema,
tuningConfig,
rowIngestionMeters,
- parseExceptionHandler,
- useMaxMemoryEstimates
+ parseExceptionHandler
);
boolean exceptionOccurred = false;
try (
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 2550408ddc0..e1c4455fe0e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -212,7 +212,6 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
- isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 417195326fb..3fa23961cfc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -71,7 +71,6 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -94,7 +93,6 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
cachePopulatorStats,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
return realtimeAppenderator;
@@ -112,7 +110,6 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -127,7 +124,6 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
indexMerger,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
index 754934975c4..03c59cb446b 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
@@ -143,10 +143,6 @@ public abstract class DimensionDictionary<T extends
Comparable<T>>
*/
public long sizeInBytes()
{
- if (!computeOnHeapSize()) {
- throw new IllegalStateException("On-heap size computation is disabled");
- }
-
return sizeInBytes.get();
}
@@ -169,11 +165,8 @@ public abstract class DimensionDictionary<T extends
Comparable<T>>
}
}
- long extraSize = 0;
- if (computeOnHeapSize()) {
- // Add size of new dim value and 2 references (valueToId and idToValue)
- extraSize = estimateSizeOfValue(originalValue) + 2L * Long.BYTES;
- }
+ // Add size of new dim value and 2 references (valueToId and idToValue)
+ final long extraSize = estimateSizeOfValue(originalValue) + 2L *
Long.BYTES;
stamp = lock.writeLock();
try {
@@ -271,15 +264,6 @@ public abstract class DimensionDictionary<T extends
Comparable<T>>
/**
* Estimates the size of the dimension value in bytes.
- * <p>
- * This method is called when adding a new dimension value to the lookup only
- * if {@link #computeOnHeapSize()} returns true.
*/
public abstract long estimateSizeOfValue(T value);
-
- /**
- * Whether on-heap size of this dictionary should be computed.
- */
- public abstract boolean computeOnHeapSize();
-
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
index 689dfde16cd..e9359b74625 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
@@ -96,11 +96,9 @@ public interface DimensionHandler
* Creates a new DimensionIndexer, a per-dimension object responsible for
processing ingested rows in-memory, used
* by the IncrementalIndex. See {@link DimensionIndexer} interface for more
information.
*
- * @param useMaxMemoryEstimates true if the created DimensionIndexer should
use
- * maximum values to estimate on-heap memory
* @return A new DimensionIndexer object.
*/
- DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType>
makeIndexer(boolean useMaxMemoryEstimates);
+ DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType>
makeIndexer();
/**
* @deprecated use {@link #makeMerger(String, IndexSpec,
SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
index e8b9b9a2c9c..db81931d4d5 100644
---
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
@@ -71,7 +71,7 @@ public class DoubleDimensionHandler implements
DimensionHandler<Double, Double,
}
@Override
- public DimensionIndexer<Double, Double, Double> makeIndexer(boolean
useMaxMemoryEstimates)
+ public DimensionIndexer<Double, Double, Double> makeIndexer()
{
return new DoubleDimensionIndexer(dimensionName);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
index 0defd19c0a9..b66b9f9f411 100644
---
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
@@ -71,7 +71,7 @@ public class FloatDimensionHandler implements
DimensionHandler<Float, Float, Flo
}
@Override
- public DimensionIndexer<Float, Float, Float> makeIndexer(boolean
useMaxMemoryEstimates)
+ public DimensionIndexer<Float, Float, Float> makeIndexer()
{
return new FloatDimensionIndexer(dimensionName);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
index 928bd8e2b13..150c32215c0 100644
---
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
@@ -71,7 +71,7 @@ public class LongDimensionHandler implements
DimensionHandler<Long, Long, Long>
}
@Override
- public DimensionIndexer<Long, Long, Long> makeIndexer(boolean
useMaxMemoryEstimates)
+ public DimensionIndexer<Long, Long, Long> makeIndexer()
{
return new LongDimensionIndexer(dimensionName);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
index ab3f9053bf1..4e13df8a786 100644
---
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
@@ -71,7 +71,7 @@ public class NestedCommonFormatColumnHandler implements
DimensionHandler<Structu
}
@Override
- public DimensionIndexer<StructuredData, StructuredData, StructuredData>
makeIndexer(boolean useMaxMemoryEstimates)
+ public DimensionIndexer<StructuredData, StructuredData, StructuredData>
makeIndexer()
{
return new AutoTypeColumnIndexer(name, castTo);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionDictionary.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionDictionary.java
index ccff9a2589b..06d8c45eeae 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionDictionary.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionDictionary.java
@@ -24,18 +24,12 @@ package org.apache.druid.segment;
*/
public class StringDimensionDictionary extends DimensionDictionary<String>
{
- private final boolean computeOnHeapSize;
-
/**
* Creates a StringDimensionDictionary.
- *
- * @param computeOnHeapSize true if on-heap memory estimation of the
dictionary
- * size should be enabled, false otherwise
*/
- public StringDimensionDictionary(boolean computeOnHeapSize)
+ public StringDimensionDictionary()
{
super(String.class);
- this.computeOnHeapSize = computeOnHeapSize;
}
@Override
@@ -45,10 +39,4 @@ public class StringDimensionDictionary extends
DimensionDictionary<String>
// Total string size = 28B (string metadata) + 16B (char array metadata) +
2B * num letters
return 28 + 16 + (2L * value.length());
}
-
- @Override
- public boolean computeOnHeapSize()
- {
- return computeOnHeapSize;
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index 37e62bad64c..f20ed3bbc1a 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -158,9 +158,9 @@ public class StringDimensionHandler implements
DimensionHandler<Integer, int[],
}
@Override
- public DimensionIndexer<Integer, int[], String> makeIndexer(boolean
useMaxMemoryEstimates)
+ public DimensionIndexer<Integer, int[], String> makeIndexer()
{
- return new StringDimensionIndexer(multiValueHandling, hasBitmapIndexes,
hasSpatialIndexes, useMaxMemoryEstimates);
+ return new StringDimensionIndexer(multiValueHandling, hasBitmapIndexes,
hasSpatialIndexes);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index 7f07ed0f15d..d1ce3cf48d0 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -57,21 +57,18 @@ public class StringDimensionIndexer extends
DictionaryEncodedColumnIndexer<int[]
private final MultiValueHandling multiValueHandling;
private final boolean hasBitmapIndexes;
private final boolean hasSpatialIndexes;
- private final boolean useMaxMemoryEstimates;
private volatile boolean hasMultipleValues = false;
public StringDimensionIndexer(
@Nullable MultiValueHandling multiValueHandling,
boolean hasBitmapIndexes,
- boolean hasSpatialIndexes,
- boolean useMaxMemoryEstimates
+ boolean hasSpatialIndexes
)
{
- super(new StringDimensionDictionary(!useMaxMemoryEstimates));
+ super(new StringDimensionDictionary());
this.multiValueHandling = multiValueHandling == null ?
MultiValueHandling.ofDefault() : multiValueHandling;
this.hasBitmapIndexes = hasBitmapIndexes;
this.hasSpatialIndexes = hasSpatialIndexes;
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
}
@Override
@@ -79,7 +76,7 @@ public class StringDimensionIndexer extends
DictionaryEncodedColumnIndexer<int[]
{
final int[] encodedDimensionValues;
final int oldDictSize = dimLookup.size();
- final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 :
dimLookup.sizeInBytes();
+ final long oldDictSizeInBytes = dimLookup.sizeInBytes();
// expressions which operate on multi-value string inputs as arrays might
spit out arrays, coerce to list
if (dimValues instanceof Object[]) {
@@ -136,14 +133,9 @@ public class StringDimensionIndexer extends
DictionaryEncodedColumnIndexer<int[]
sortedLookup = null;
}
- long effectiveSizeBytes;
- if (useMaxMemoryEstimates) {
- effectiveSizeBytes =
estimateEncodedKeyComponentSize(encodedDimensionValues);
- } else {
- // size of encoded array + dictionary size change
- effectiveSizeBytes = 16L + (long) encodedDimensionValues.length *
Integer.BYTES
- + (dimLookup.sizeInBytes() - oldDictSizeInBytes);
- }
+ // size of encoded array + dictionary size change
+ final long effectiveSizeBytes = 16L + ((long)
encodedDimensionValues.length * Integer.BYTES)
+ + (dimLookup.sizeInBytes() -
oldDictSizeInBytes);
return new EncodedKeyComponent<>(encodedDimensionValues,
effectiveSizeBytes);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index fcfca99cdfe..aca1a2ba2f1 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -37,7 +37,6 @@ public abstract class AppendableIndexBuilder
// DruidInputSource since that is the only case where we can have existing
metrics.
// This is currently only use by auto compaction and should not be use for
anything else.
protected boolean preserveExistingMetrics = false;
- protected boolean useMaxMemoryEstimates = false;
protected final Logger log = new Logger(this.getClass());
@@ -103,12 +102,6 @@ public abstract class AppendableIndexBuilder
return this;
}
- public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean
useMaxMemoryEstimates)
- {
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
- return this;
- }
-
public void validate()
{
if (maxRowCount <= 0) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 62a9541b5e9..ff7c8a48a85 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -253,7 +253,6 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
private final Map<String, ColumnFormat> timeAndMetricsColumnFormats;
private final AtomicInteger numEntries = new AtomicInteger();
private final AtomicLong bytesInMemory = new AtomicLong();
- private final boolean useMaxMemoryEstimates;
private final boolean useSchemaDiscovery;
@@ -271,12 +270,10 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
* This should only be set for
DruidInputSource since that is the only case where we
* can have existing metrics. This is
currently only use by auto compaction and
* should not be use for anything else.
- * @param useMaxMemoryEstimates true if max values should be used to
estimate memory
*/
protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
- final boolean preserveExistingMetrics,
- final boolean useMaxMemoryEstimates
+ final boolean preserveExistingMetrics
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
@@ -286,7 +283,6 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
this.preserveExistingMetrics = preserveExistingMetrics;
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec()
.useSchemaDiscovery();
@@ -898,7 +894,7 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
private DimensionDesc initDimension(int dimensionIndex, String
dimensionName, DimensionHandler dimensionHandler)
{
- return new DimensionDesc(dimensionIndex, dimensionName, dimensionHandler,
useMaxMemoryEstimates);
+ return new DimensionDesc(dimensionIndex, dimensionName, dimensionHandler);
}
@Override
@@ -990,12 +986,12 @@ public abstract class IncrementalIndex implements
IncrementalIndexRowSelector, C
private final DimensionHandler<?, ?, ?> handler;
private final DimensionIndexer<?, ?, ?> indexer;
- public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?>
handler, boolean useMaxMemoryEstimates)
+ public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?>
handler)
{
this.index = index;
this.name = name;
this.handler = handler;
- this.indexer = handler.makeIndexer(useMaxMemoryEstimates);
+ this.indexer = handler.makeIndexer();
}
public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?>
handler, DimensionIndexer<?, ?, ?> indexer)
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
index ff5b8563f70..c199c8d0871 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
@@ -74,8 +74,6 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new
ConcurrentHashMap<>();
private final ColumnSelectorFactory virtualSelectorFactory;
private final Map<String, ColumnSelectorFactory> aggSelectors;
- private final boolean useMaxMemoryEstimates;
- private final long maxBytesPerRowForAggregators;
private final long minTimestamp;
private final AtomicInteger rowCounter = new AtomicInteger(0);
private final AtomicInteger numEntries = new AtomicInteger(0);
@@ -84,15 +82,11 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
AggregateProjectionSpec projectionSpec,
Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc,
Function<String, AggregatorFactory> getBaseTableAggregatorFactory,
- long minTimestamp,
- boolean useMaxMemoryEstimates,
- long maxBytesPerRowForAggregators
+ long minTimestamp
)
{
this.projectionSchema = projectionSpec.toMetadataSchema();
this.minTimestamp = minTimestamp;
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
- this.maxBytesPerRowForAggregators = maxBytesPerRowForAggregators;
// initialize dimensions, facts holder
this.dimensions = new ArrayList<>();
@@ -105,7 +99,7 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
this.dimensionsMap = new HashMap<>();
this.columnFormats = new LinkedHashMap<>();
- initializeAndValidateDimensions(projectionSpec, getBaseTableDimensionDesc,
useMaxMemoryEstimates);
+ initializeAndValidateDimensions(projectionSpec, getBaseTableDimensionDesc);
final IncrementalIndex.IncrementalIndexRowComparator rowComparator = new
IncrementalIndex.IncrementalIndexRowComparator(
projectionSchema.getTimeColumnPosition() < 0 ? dimensions.size() :
projectionSchema.getTimeColumnPosition(),
dimensions
@@ -173,10 +167,9 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
aggs,
inputRowHolder,
parseExceptionMessages,
- useMaxMemoryEstimates,
false
);
- totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 :
aggForProjectionSizeDelta);
+ totalSizeInBytes.addAndGet(aggForProjectionSizeDelta);
} else {
aggs = new Aggregator[aggregatorFactories.length];
long aggSizeForProjectionRow = factorizeAggs(aggregatorFactories, aggs);
@@ -185,15 +178,13 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
aggs,
inputRowHolder,
parseExceptionMessages,
- useMaxMemoryEstimates,
false
);
- final long estimatedSizeOfAggregators =
- useMaxMemoryEstimates ? maxBytesPerRowForAggregators :
aggSizeForProjectionRow;
+ final long estimatedSizeOfAggregators = aggSizeForProjectionRow;
final long projectionRowSize = key.estimateBytesInMemory()
+ estimatedSizeOfAggregators
+
OnheapIncrementalIndex.ROUGH_OVERHEAD_PER_MAP_ENTRY;
- totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 :
projectionRowSize);
+ totalSizeInBytes.addAndGet(projectionRowSize);
numEntries.incrementAndGet();
}
final int rowIndex = rowCounter.getAndIncrement();
@@ -374,8 +365,7 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
private void initializeAndValidateDimensions(
AggregateProjectionSpec projectionSpec,
- Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc,
- boolean useMaxMemoryEstimates
+ Function<String, IncrementalIndex.DimensionDesc>
getBaseTableDimensionDesc
)
{
int i = 0;
@@ -393,8 +383,7 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
final IncrementalIndex.DimensionDesc childOnly = new
IncrementalIndex.DimensionDesc(
i++,
dimension.getName(),
- dimension.getDimensionHandler(),
- useMaxMemoryEstimates
+ dimension.getDimensionHandler()
);
dimensions.add(childOnly);
@@ -514,14 +503,10 @@ public class OnHeapAggregateProjection implements
IncrementalIndexRowSelector
for (int i = 0; i < aggregatorFactories.length; i++) {
final AggregatorFactory agg = aggregatorFactories[i];
// Creates aggregators to aggregate from input into output fields
- if (useMaxMemoryEstimates) {
- aggs[i] = agg.factorize(aggSelectors.get(agg.getName()));
- } else {
- AggregatorAndSize aggregatorAndSize =
agg.factorizeWithSize(aggSelectors.get(agg.getName()));
- aggs[i] = aggregatorAndSize.getAggregator();
- totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
- totalInitialSizeBytes += aggReferenceSize;
- }
+ AggregatorAndSize aggregatorAndSize =
agg.factorizeWithSize(aggSelectors.get(agg.getName()));
+ aggs[i] = aggregatorAndSize.getAggregator();
+ totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
+ totalInitialSizeBytes += aggReferenceSize;
}
return totalInitialSizeBytes;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index fa622175184..88154a5a474 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -84,14 +84,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
{
private static final Logger log = new Logger(OnheapIncrementalIndex.class);
- /**
- * Constant factor provided to {@link
AggregatorFactory#guessAggregatorHeapFootprint(long)} for footprint estimates.
- * This figure is large enough to catch most common rollup ratios, but not
so large that it will cause persists to
- * happen too often. If an actual workload involves a much higher rollup
ratio, then this may lead to excessive
- * heap usage. Users would have to work around that by lowering
maxRowsInMemory or maxBytesInMemory.
- */
- private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION =
100;
-
/**
* overhead per {@link ConcurrentSkipListMap.Node} object in facts table
*/
@@ -100,37 +92,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new
ConcurrentHashMap<>();
private final FactsHolder facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
- private final long maxBytesPerRowForAggregators;
protected final int maxRowCount;
protected final long maxBytesInMemory;
- /**
- * Flag denoting if max possible values should be used to estimate on-heap
mem
- * usage.
- * <p>
- * There is one instance of Aggregator per metric per row key.
- * <p>
- * <b>Old Method:</b> {@code useMaxMemoryEstimates = true} (default)
- * <ul>
- * <li>Aggregator: For a given metric, compute the max memory an aggregator
- * can use and multiply that by number of aggregators (same as number of
- * aggregated rows or number of unique row keys)</li>
- * <li>DimensionIndexer: For each row, encode dimension values and estimate
- * size of original dimension values</li>
- * </ul>
- *
- * <b>New Method:</b> {@code useMaxMemoryEstimates = false}
- * <ul>
- * <li>Aggregator: Get the initialize of an Aggregator instance, and add
the
- * incremental size required in each aggregation step.</li>
- * <li>DimensionIndexer: For each row, encode dimension values and estimate
- * size of dimension values only if they are newly added to the
dictionary</li>
- * </ul>
- * <p>
- * Thus the new method eliminates over-estimations.
- */
- private final boolean useMaxMemoryEstimates;
-
/**
* Aggregator name -> column selector factory for that aggregator.
*/
@@ -154,11 +118,10 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
long maxBytesInMemory,
// preserveExistingMetrics should only be set true for DruidInputSource
since that is the only case where we can
// have existing metrics. This is currently only use by auto compaction
and should not be use for anything else.
- boolean preserveExistingMetrics,
- boolean useMaxMemoryEstimates
+ boolean preserveExistingMetrics
)
{
- super(incrementalIndexSchema, preserveExistingMetrics,
useMaxMemoryEstimates);
+ super(incrementalIndexSchema, preserveExistingMetrics);
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE :
maxBytesInMemory;
if (incrementalIndexSchema.isRollup()) {
@@ -168,16 +131,13 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
} else {
this.facts = new PlainNonTimeOrderedFactsHolder(dimsComparator());
}
- this.maxBytesPerRowForAggregators =
- useMaxMemoryEstimates ?
getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.aggregateProjections = new
ObjectAVLTreeSet<>(AggregateProjectionMetadata.COMPARATOR);
this.projections = new HashMap<>();
- initializeProjections(incrementalIndexSchema, useMaxMemoryEstimates);
+ initializeProjections(incrementalIndexSchema);
}
- private void initializeProjections(IncrementalIndexSchema
incrementalIndexSchema, boolean useMaxMemoryEstimates)
+ private void initializeProjections(IncrementalIndexSchema
incrementalIndexSchema)
{
for (AggregateProjectionSpec projectionSpec :
incrementalIndexSchema.getProjections()) {
// initialize them all with 0 rows
@@ -194,50 +154,12 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
}
return null;
},
- incrementalIndexSchema.getMinTimestamp(),
- this.useMaxMemoryEstimates,
- this.maxBytesPerRowForAggregators
+ incrementalIndexSchema.getMinTimestamp()
);
projections.put(projectionSpec.getName(), projection);
}
}
- /**
- * Old method of memory estimation. Used only when {@link
#useMaxMemoryEstimates} is true.
- *
- * Gives estimated max size per aggregator. It is assumed that every
aggregator will have enough overhead for its own
- * object header and for a pointer to a selector. We are adding a
overhead-factor for each object as additional 16
- * bytes.
- * These 16 bytes or 128 bits is the object metadata for 64-bit JVM process
and consists of:
- * <ul>
- * <li>Class pointer which describes the object type: 64 bits
- * <li>Flags which describe state of the object including hashcode: 64 bits
- * <ul/>
- * total size estimation consists of:
- * <ul>
- * <li> metrics length : Integer.BYTES * len
- * <li> maxAggregatorIntermediateSize : getMaxIntermediateSize per
aggregator + overhead-factor(16 bytes)
- * </ul>
- *
- * @param incrementalIndexSchema
- *
- * @return long max aggregator size in bytes
- */
- private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema
incrementalIndexSchema)
- {
- final long rowsPerAggregator =
- incrementalIndexSchema.isRollup() ?
ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION : 1;
-
- long maxAggregatorIntermediateSize = ((long) Integer.BYTES) *
incrementalIndexSchema.getMetrics().length;
-
- for (final AggregatorFactory aggregator :
incrementalIndexSchema.getMetrics()) {
- maxAggregatorIntermediateSize +=
- (long) aggregator.guessAggregatorHeapFootprint(rowsPerAggregator) +
2L * Long.BYTES;
- }
-
- return maxAggregatorIntermediateSize;
- }
-
@Override
public FactsHolder getFacts()
{
@@ -323,7 +245,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = aggregators.get(priorIndex);
long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder,
parseExceptionMessages);
- totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
+ totalSizeInBytes.addAndGet(aggSizeDelta);
} else {
if (preserveExistingMetrics) {
aggs = new Aggregator[metrics.length * 2];
@@ -354,8 +276,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
// For a new key, row size = key size + aggregator size + overhead
- final long estimatedSizeOfAggregators =
- useMaxMemoryEstimates ? maxBytesPerRowForAggregators : aggSizeForRow;
+ final long estimatedSizeOfAggregators = aggSizeForRow;
final long rowSize = key.estimateBytesInMemory()
+ estimatedSizeOfAggregators
+ ROUGH_OVERHEAD_PER_MAP_ENTRY;
@@ -375,7 +296,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
* Creates aggregators for the given aggregator factories.
*
* @return Total initial size in bytes required by all the aggregators.
- * This value is non-zero only when {@link #useMaxMemoryEstimates} is false.
*/
private long factorizeAggs(
AggregatorFactory[] metrics,
@@ -387,25 +307,17 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
// Creates aggregators to aggregate from input into output fields
- if (useMaxMemoryEstimates) {
- aggs[i] = agg.factorize(selectors.get(agg.getName()));
- } else {
- AggregatorAndSize aggregatorAndSize =
agg.factorizeWithSize(selectors.get(agg.getName()));
- aggs[i] = aggregatorAndSize.getAggregator();
- totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
- totalInitialSizeBytes += aggReferenceSize;
- }
+ AggregatorAndSize aggregatorAndSize =
agg.factorizeWithSize(selectors.get(agg.getName()));
+ aggs[i] = aggregatorAndSize.getAggregator();
+ totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
+ totalInitialSizeBytes += aggReferenceSize;
// Creates aggregators to combine already aggregated field
if (preserveExistingMetrics) {
AggregatorFactory combiningAgg = agg.getCombiningFactory();
- if (useMaxMemoryEstimates) {
- aggs[i + metrics.length] =
combiningAgg.factorize(combiningAggSelectors.get(combiningAgg.getName()));
- } else {
- AggregatorAndSize aggregatorAndSize =
combiningAgg.factorizeWithSize(combiningAggSelectors.get(combiningAgg.getName()));
- aggs[i + metrics.length] = aggregatorAndSize.getAggregator();
- totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
- totalInitialSizeBytes += aggReferenceSize;
- }
+ AggregatorAndSize combiningAggAndSize =
combiningAgg.factorizeWithSize(combiningAggSelectors.get(combiningAgg.getName()));
+ aggs[i + metrics.length] = combiningAggAndSize.getAggregator();
+ totalInitialSizeBytes += combiningAggAndSize.getInitialSizeBytes();
+ totalInitialSizeBytes += aggReferenceSize;
}
}
return totalInitialSizeBytes;
@@ -414,9 +326,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
/**
* Performs aggregation for all of the aggregators.
*
- * @return Total incremental memory in bytes required by this step of the
- * aggregation. The returned value is non-zero only if
- * {@link #useMaxMemoryEstimates} is false.
+ * @return Total incremental memory in bytes required by this step of the
aggregation.
*/
private long doAggregate(
AggregatorFactory[] metrics,
@@ -425,7 +335,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
List<String> parseExceptionsHolder
)
{
- return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder,
useMaxMemoryEstimates, preserveExistingMetrics);
+ return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder,
preserveExistingMetrics);
}
static long doAggregate(
@@ -433,7 +343,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
Aggregator[] aggs,
InputRowHolder inputRowHolder,
List<String> parseExceptionsHolder,
- boolean useMaxMemoryEstimates,
boolean preserveExistingMetrics
)
{
@@ -448,11 +357,7 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
agg = aggs[i];
}
try {
- if (useMaxMemoryEstimates) {
- agg.aggregate();
- } else {
- totalIncrementalBytes += agg.aggregateWithSize();
- }
+ totalIncrementalBytes += agg.aggregateWithSize();
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects
something but gets something else.
@@ -760,8 +665,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema
is null"),
maxRowCount,
maxBytesInMemory,
- preserveExistingMetrics,
- useMaxMemoryEstimates
+ preserveExistingMetrics
);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/StringDimensionIndexerTest.java
b/processing/src/test/java/org/apache/druid/segment/StringDimensionIndexerTest.java
index b43f3c1005f..7e8adc577b0 100644
---
a/processing/src/test/java/org/apache/druid/segment/StringDimensionIndexerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/StringDimensionIndexerTest.java
@@ -38,7 +38,6 @@ public class StringDimensionIndexerTest extends
InitializedNullHandlingTest
final StringDimensionIndexer indexer = new StringDimensionIndexer(
DimensionSchema.MultiValueHandling.SORTED_ARRAY,
true,
- false,
false
);
@@ -86,88 +85,19 @@ public class StringDimensionIndexerTest extends
InitializedNullHandlingTest
Assert.assertEquals(306L, totalEstimatedSize);
}
- @Test
- public void testProcessRowValsToEncodedKeyComponent_usingMaxEstimates()
- {
- final StringDimensionIndexer indexer = new StringDimensionIndexer(
- DimensionSchema.MultiValueHandling.SORTED_ARRAY,
- true,
- false,
- true
- );
-
- long totalEstimatedSize = 0L;
-
- // Verify size for a non-empty dimension value
- totalEstimatedSize += verifyEncodedValues(
- indexer,
- "abc",
- new int[]{0},
- 54L
- );
-
- // Verify size for null dimension value
- totalEstimatedSize += verifyEncodedValues(
- indexer,
- null,
- new int[]{1},
- 4L
- );
-
- // Verify size delta with repeated dimension value
- totalEstimatedSize += verifyEncodedValues(
- indexer,
- "abc",
- new int[]{0},
- 54L
- );
- // Verify size delta with newly added dimension value
- totalEstimatedSize += verifyEncodedValues(
- indexer,
- "def",
- new int[]{2},
- 54L
- );
-
- // Verify size delta for multi-values
- totalEstimatedSize += verifyEncodedValues(
- indexer,
- Arrays.asList("abc", "def", "ghi"),
- new int[]{0, 2, 3},
- 162L
- );
-
- Assert.assertEquals(328L, totalEstimatedSize);
- }
-
@Test
public void testProcessRowValsToEncodedKeyComponent_comparison()
{
- // Create indexers with useMaxMemoryEstimates = true/false
final StringDimensionIndexer indexerForAvgEstimates = new
StringDimensionIndexer(
DimensionSchema.MultiValueHandling.SORTED_ARRAY,
true,
- false,
false
);
- StringDimensionIndexer indexerForMaxEstimates = new StringDimensionIndexer(
- DimensionSchema.MultiValueHandling.SORTED_ARRAY,
- true,
- false,
- true
- );
// Verify sizes with newly added dimension values
- long totalSizeWithMaxEstimates = 0L;
long totalSizeWithAvgEstimates = 0L;
for (int i = 0; i < 10; ++i) {
final String dimValue = "value-" + i;
- totalSizeWithMaxEstimates += verifyEncodedValues(
- indexerForMaxEstimates,
- dimValue,
- new int[]{i},
- 62L
- );
totalSizeWithAvgEstimates += verifyEncodedValues(
indexerForAvgEstimates,
dimValue,
@@ -177,20 +107,12 @@ public class StringDimensionIndexerTest extends
InitializedNullHandlingTest
}
// If all dimension values are unique (or cardinality is high),
- // estimates with useMaxMemoryEstimates = false tend to be higher
- Assert.assertEquals(620L, totalSizeWithMaxEstimates);
Assert.assertEquals(940L, totalSizeWithAvgEstimates);
// Verify sizes with repeated dimension values
for (int i = 0; i < 100; ++i) {
final int index = i % 10;
final String dimValue = "value-" + index;
- totalSizeWithMaxEstimates += verifyEncodedValues(
- indexerForMaxEstimates,
- dimValue,
- new int[]{index},
- 62L
- );
totalSizeWithAvgEstimates += verifyEncodedValues(
indexerForAvgEstimates,
dimValue,
@@ -199,9 +121,6 @@ public class StringDimensionIndexerTest extends
InitializedNullHandlingTest
);
}
- // If dimension values are frequently repeated (cardinality is low),
- // estimates with useMaxMemoryEstimates = false tend to be much lower
- Assert.assertEquals(6820L, totalSizeWithMaxEstimates);
Assert.assertEquals(2940L, totalSizeWithAvgEstimates);
}
@@ -211,7 +130,6 @@ public class StringDimensionIndexerTest extends
InitializedNullHandlingTest
final StringDimensionIndexer indexer = new StringDimensionIndexer(
DimensionSchema.MultiValueHandling.SORTED_ARRAY,
true,
- false,
false
);
final byte[] byteVal = new byte[]{0x01, 0x02, 0x03, 0x04};
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
index fcf8c280e17..92ae8d0debc 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
@@ -53,7 +53,6 @@ public class IncrementalIndexRowSizeTest extends
InitializedNullHandlingTest
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(10_000)
.setMaxBytesInMemory(1_000)
- .setUseMaxMemoryEstimates(true)
.build())
);
}
@@ -77,8 +76,7 @@ public class IncrementalIndexRowSizeTest extends
InitializedNullHandlingTest
"B" // 50 Bytes
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
- // 32 (timestamp + dims array + dimensionDescList) + 50 ("A") + 50 ("B")
- Assert.assertEquals(132, td1.estimateBytesInMemory());
+ Assert.assertEquals(196, td1.estimateBytesInMemory());
}
@Test
@@ -94,8 +92,7 @@ public class IncrementalIndexRowSizeTest extends
InitializedNullHandlingTest
Arrays.asList("A", "B") // 100 Bytes
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
- // 32 (timestamp + dims array + dimensionDescList) + 50 ("A") + 100 ("A",
"B")
- Assert.assertEquals(182, td1.estimateBytesInMemory());
+ Assert.assertEquals(262, td1.estimateBytesInMemory());
}
@Test
@@ -111,8 +108,7 @@ public class IncrementalIndexRowSizeTest extends
InitializedNullHandlingTest
Arrays.asList("123", "abcdef") // 54 + 60 Bytes
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
- // 32 (timestamp + dims array + dimensionDescList) + 60 ("nelson") + 114
("123", "abcdef")
- Assert.assertEquals(206, td1.estimateBytesInMemory());
+ Assert.assertEquals(286, td1.estimateBytesInMemory());
}
@Test
@@ -123,11 +119,10 @@ public class IncrementalIndexRowSizeTest extends
InitializedNullHandlingTest
IncrementalIndex.IncrementalIndexRowResult tndResult =
index.toIncrementalIndexRow(toMapRow(
time + 1,
"billy",
- "" // NullHandling.sqlCompatible() ? 48 Bytes : 4 Bytes
+ ""
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
- // 28 (timestamp + dims array + dimensionDescList) + 4 OR 48 depending on
NullHandling.sqlCompatible()
- Assert.assertEquals(76, td1.estimateBytesInMemory());
+ Assert.assertEquals(108, td1.estimateBytesInMemory());
}
private MapBasedInputRow toMapRow(long time, Object... dimAndVal)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 28b4379f7b9..92cfaabde21 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -60,7 +60,6 @@ public class Appenderators
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -91,7 +90,6 @@ public class Appenderators
cache,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
}
@@ -107,7 +105,6 @@ public class Appenderators
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -125,7 +122,6 @@ public class Appenderators
indexMerger,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index ec328c3b3cd..9ba7715ab6c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -86,7 +86,6 @@ public interface AppenderatorsManager
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
);
@@ -106,7 +105,6 @@ public interface AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 2c6fe0f8769..e0dfa216528 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -120,7 +120,6 @@ public class BatchAppenderator implements Appenderator
private final IndexMerger indexMerger;
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
- private final boolean useMaxMemoryEstimates;
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
@@ -171,7 +170,6 @@ public class BatchAppenderator implements Appenderator
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -189,7 +187,6 @@ public class BatchAppenderator implements Appenderator
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck =
tuningConfig.isSkipBytesInMemoryOverheadCheck();
maxPendingPersists = tuningConfig.getMaxPendingPersists();
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.fingerprintGenerator = new FingerprintGenerator(objectMapper);
}
@@ -480,8 +477,7 @@ public class BatchAppenderator implements Appenderator
identifier.getVersion(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- maxBytesTuningConfig,
- useMaxMemoryEstimates
+ maxBytesTuningConfig
);
bytesCurrentlyInMemory += calculateSinkMemoryInUsed();
sinks.put(identifier, retVal);
@@ -1073,7 +1069,6 @@ public class BatchAppenderator implements Appenderator
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
- useMaxMemoryEstimates,
hydrants
);
retVal.finishWriting(); // this sink is not writable
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index d613f3ff59c..ceb36520652 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -76,7 +76,6 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -95,7 +94,6 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 998f674daf7..cbb29c4a11b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -82,7 +82,6 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -110,7 +109,6 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
cachePopulatorStats,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
}
@@ -129,7 +127,6 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -148,7 +145,6 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
indexMerger,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
return batchAppenderator;
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 8a7b3b535f5..2b37481a8ed 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -152,7 +152,6 @@ public class StreamAppenderator implements Appenderator
private final Set<SegmentIdWithShardSpec> droppingSinks =
Sets.newConcurrentHashSet();
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
- private final boolean useMaxMemoryEstimates;
private final QuerySegmentWalker texasRanger;
// This variable updated in add(), persist(), and drop()
@@ -230,7 +229,6 @@ public class StreamAppenderator implements Appenderator
Cache cache,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -251,7 +249,6 @@ public class StreamAppenderator implements Appenderator
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck =
tuningConfig.isSkipBytesInMemoryOverheadCheck();
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.sinkSchemaAnnouncer = new SinkSchemaAnnouncer();
@@ -523,8 +520,7 @@ public class StreamAppenderator implements Appenderator
identifier.getVersion(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- maxBytesTuningConfig,
- useMaxMemoryEstimates
+ maxBytesTuningConfig
);
bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(retVal));
@@ -1411,7 +1407,6 @@ public class StreamAppenderator implements Appenderator
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
- useMaxMemoryEstimates,
hydrants
);
rowsSoFar += currSink.getNumRows();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 71157cea8d9..265ea104bc0 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -168,7 +168,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -193,7 +192,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
cache,
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
@@ -214,7 +212,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
- boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
@@ -235,7 +232,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
wrapIndexMerger(indexMerger),
rowIngestionMeters,
parseExceptionHandler,
- useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
datasourceBundle.addAppenderator(taskId, appenderator);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 6d7e963208c..68c0e970123 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -66,7 +66,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
@@ -85,7 +84,6 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
- private final boolean useMaxMemoryEstimates;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new
CopyOnWriteArrayList<>();
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
@@ -109,8 +107,7 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
String version,
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
- long maxBytesInMemory,
- boolean useMaxMemoryEstimates
+ long maxBytesInMemory
)
{
this(
@@ -121,7 +118,6 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
- useMaxMemoryEstimates,
Collections.emptyList()
);
}
@@ -134,7 +130,6 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
- boolean useMaxMemoryEstimates,
List<FireHydrant> hydrants
)
{
@@ -145,7 +140,6 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
- this.useMaxMemoryEstimates = useMaxMemoryEstimates;
int maxCount = -1;
for (int i = 0; i < hydrants.size(); ++i) {
@@ -306,7 +300,7 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
* Acquire references to all {@link FireHydrant} that represent this sink.
Returns null if they cannot all be
* acquired, possibly because they were closed (swapped to null)
concurrently with this method being called.
*
- * @param segmentMapFn from {@link
org.apache.druid.query.DataSource#createSegmentMapFunction(Query, AtomicLong)}
+ * @param segmentMapFn from {@link
org.apache.druid.query.DataSource#createSegmentMapFunction(Query)}
* @param skipIncrementalSegment whether in-memory {@link IncrementalIndex}
segments should be skipped
*/
@Nullable
@@ -336,7 +330,6 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
.setIndexSchema(indexSchema)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
- .setUseMaxMemoryEstimates(useMaxMemoryEstimates)
.build();
final FireHydrant old;
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index 85bb173bce2..b80e33dfa18 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -298,12 +298,12 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = 1;
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((BatchAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1),
null);
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((BatchAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
appenderator.close();
@@ -330,10 +330,10 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1),
null);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182
int nullHandlingOverhead = 1;
- Assert.assertEquals(182 + nullHandlingOverhead, ((BatchAppenderator)
appenderator).getBytesCurrentlyInMemory());
+ Assert.assertEquals(190 + nullHandlingOverhead, ((BatchAppenderator)
appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1),
null);
Assert.assertEquals(
- 364 + 2 * nullHandlingOverhead,
+ 380 + 2 * nullHandlingOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, appenderator.getSegments().size());
@@ -353,7 +353,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
Assert.assertEquals(
@@ -366,7 +366,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 53; i++) {
+ for (int i = 0; i < 50; i++) {
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" +
i, 1), null);
}
@@ -387,7 +387,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// Add a single row after persisted
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1),
null);
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
- currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ currentInMemoryIndexSize = 190 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
@@ -398,7 +398,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 53; i++) {
+ for (int i = 0; i < 50; i++) {
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" +
i, 1), null);
}
@@ -476,7 +476,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
@@ -506,7 +506,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
Assert.assertEquals(
@@ -523,7 +523,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 49; i++) {
+ for (int i = 0; i < 47; i++) {
// these records are 186 bytes
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" +
i, 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" +
i, 1), null);
@@ -549,7 +549,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// Add a single row after persisted to sink 0
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1),
null);
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
- currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ currentInMemoryIndexSize = 190 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((BatchAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
@@ -581,7 +581,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the both sink to cause persist.
- for (int i = 0; i < 49; i++) {
+ for (int i = 0; i < 47; i++) {
// 186 bytes
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" +
i, 1), null);
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" +
i, 1), null);
@@ -626,7 +626,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
//we still calculate the size even when ignoring it to make persist
decision
int nullHandlingOverhead = 1;
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((BatchAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(1, ((BatchAppenderator)
appenderator).getRowsInMemory());
@@ -636,7 +636,7 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
// persisted:
int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
- (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
+ (380 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, ((BatchAppenderator)
appenderator).getRowsInMemory());
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
index 6b19aaca3ca..19e75a712f3 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
@@ -239,7 +239,6 @@ public class BatchAppenderatorTester implements
AutoCloseable
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- true,
CentralizedDatasourceSchemaConfig.create()
);
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 72a221e9040..abb77fced0c 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -298,15 +298,14 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
- //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
int nullHandlingOverhead = 1;
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((StreamAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((StreamAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
appenderator.close();
@@ -347,12 +346,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
- //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182
+ //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 190
int nullHandlingOverhead = 1;
- Assert.assertEquals(182 + nullHandlingOverhead, ((StreamAppenderator)
appenderator).getBytesCurrentlyInMemory());
+ Assert.assertEquals(190 + nullHandlingOverhead, ((StreamAppenderator)
appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
Assert.assertEquals(
- 364 + 2 * nullHandlingOverhead,
+ 380 + 2 * nullHandlingOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
@@ -393,9 +392,9 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
- //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
+ //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 190 + 1 byte when null handling is
enabled
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
Assert.assertEquals(
@@ -408,7 +407,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 53; i++) {
+ for (int i = 0; i < 50; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
}
sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
@@ -433,7 +432,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
// Add a single row after persisted
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1),
committerSupplier);
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
- currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ currentInMemoryIndexSize = 190 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
@@ -444,7 +443,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 31; i++) {
+ for (int i = 0; i < 30; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
}
// currHydrant size is 0 since we just persist all indexes to disk.
@@ -591,7 +590,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
@@ -641,9 +640,9 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
- //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
+ //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 190 + 1 byte when null handling is
enabled
int nullHandlingOverhead = 1;
- int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int currentInMemoryIndexSize = 190 + nullHandlingOverhead;
int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
Assert.assertEquals(
@@ -690,7 +689,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
// Add a single row after persisted to sink 0
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1),
committerSupplier);
// currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
- currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ currentInMemoryIndexSize = 190 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
@@ -719,9 +718,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the both sink to cause persist.
- for (int i = 0; i < 34; i++) {
+ for (int i = 0; i < 33; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
- appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1),
committerSupplier);
+ if (i < 32) {
+ // adding the last one puts us over the limit,
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1),
committerSupplier);
+ }
}
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
@@ -788,14 +790,14 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
//we still calculate the size even when ignoring it to make persist
decision
int nullHandlingOverhead = 1;
Assert.assertEquals(
- 182 + nullHandlingOverhead,
+ 190 + nullHandlingOverhead,
((StreamAppenderator)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(1, ((StreamAppenderator)
appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
- (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
+ (380 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, ((StreamAppenderator)
appenderator).getRowsInMemory());
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 6f9bf400b0f..e4617c28b17 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -246,7 +246,6 @@ public class StreamAppenderatorTester implements
AutoCloseable
new CachePopulatorStats(),
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- true,
centralizedDatasourceSchemaConfig
);
} else {
@@ -288,7 +287,6 @@ public class StreamAppenderatorTester implements
AutoCloseable
new CachePopulatorStats(),
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false,
Integer.MAX_VALUE, 0),
- true,
centralizedDatasourceSchemaConfig
);
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index e92df0f6518..6dcfb6b8055 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -111,7 +111,6 @@ public class UnifiedIndexerAppenderatorsManagerTest extends
InitializedNullHandl
TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()),
new NoopRowIngestionMeters(),
new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0),
- true,
CentralizedDatasourceSchemaConfig.create()
);
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java
index 510901cd735..7ec6490be82 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/sink/SinkTest.java
@@ -93,8 +93,7 @@ public class SinkTest extends InitializedNullHandlingTest
version,
TuningConfig.DEFAULT_APPENDABLE_INDEX,
MAX_ROWS_IN_MEMORY,
- TuningConfig.DEFAULT_APPENDABLE_INDEX.getDefaultMaxBytesInMemory(),
- true
+ TuningConfig.DEFAULT_APPENDABLE_INDEX.getDefaultMaxBytesInMemory()
);
sink.add(
@@ -276,8 +275,7 @@ public class SinkTest extends InitializedNullHandlingTest
version,
TuningConfig.DEFAULT_APPENDABLE_INDEX,
MAX_ROWS_IN_MEMORY,
- TuningConfig.DEFAULT_APPENDABLE_INDEX.getDefaultMaxBytesInMemory(),
- true
+ TuningConfig.DEFAULT_APPENDABLE_INDEX.getDefaultMaxBytesInMemory()
);
sink.add(new MapBasedInputRow(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]