This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3766e3a14fb Automatically determine maxColumnsToMerge for streaming
ingestion. (#17917)
3766e3a14fb is described below
commit 3766e3a14fb49ca98ab3b02495bf1336c7eee75c
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 16 18:59:24 2025 -0700
Automatically determine maxColumnsToMerge for streaming ingestion. (#17917)
* Automatically determine maxColumnsToMerge for streaming ingestion.
Borrowing from the algorithm used by MSQ, SeekableStreamIndexTask now
also automatically determines maxColumnsToMerge when it is set to -1
(the default). The algorithm uses the minimum of:
- A figure derived from heap memory, assuming an equal amount of memory
is devoted to indexing as to merging. (This is also assumed when
computing the setting of maxBytesInMemory.)
- A figure derived from offheap memory, assuming we can use what isn't
used for processing buffers.
* Add test coverage.
* Reword.
---
.../druid/msq/exec/WorkerMemoryParameters.java | 10 +-
.../apache/druid/indexing/common/TaskToolbox.java | 22 ++
.../druid/indexing/common/TaskToolboxFactory.java | 5 +
.../SeekableStreamAppenderatorConfig.java | 283 +++++++++++++++++++++
.../seekablestream/SeekableStreamIndexTask.java | 7 +-
.../SeekableStreamIndexTaskTuningConfig.java | 22 +-
.../druid/indexing/common/TaskToolboxTest.java | 2 +
.../overlord/SingleTaskBackgroundRunnerTest.java | 2 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +
.../indexing/overlord/TestTaskToolboxFactory.java | 9 +
.../SeekableStreamAppenderatorConfigTest.java | 263 +++++++++++++++++++
.../SeekableStreamIndexTaskTestBase.java | 2 +
.../indexing/worker/WorkerTaskManagerTest.java | 1 +
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../org/apache/druid/segment/IndexMergerV9.java | 14 +-
.../realtime/appenderator/AppenderatorConfig.java | 3 +-
16 files changed, 619 insertions(+), 29 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index d7c9b9a1073..4eccd5fe317 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.SuperSorter;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamAppenderatorConfig;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
@@ -80,11 +81,6 @@ public class WorkerMemoryParameters
*/
private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
- /**
- * (Very) rough estimate of the on-heap overhead of reading a column.
- */
- private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
-
/**
* Maximum percent of each bundle's free memory that will be used for
maxRetainedBytes of
* {@link ClusterByStatisticsCollectorImpl}.
@@ -305,7 +301,9 @@ public class WorkerMemoryParameters
public int getAppenderatorMaxColumnsToMerge()
{
// Half for indexing, half for merging.
- return Ints.checkedCast(Math.max(2, getAppenderatorMemory() / 2 /
APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN));
+ final long calculatedMaxColumnsToMerge =
+ getAppenderatorMemory() / 2 /
SeekableStreamAppenderatorConfig.APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN;
+ return Ints.checkedCast(Math.max(2, calculatedMaxColumnsToMerge));
}
public int getFrameSize()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 9732d72eaf5..041ab12e790 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -99,6 +100,11 @@ public class TaskToolbox
* because it may be unavailable, e. g. for batch tasks running in Spark or
Hadoop.
*/
private final Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider;
+ /**
+ * Using Provider, not {@link DruidProcessingConfig} directly, because it
may not be available for task
+ * types that do not run queries.
+ */
+ private final Provider<DruidProcessingConfig> processingConfigProvider;
@Nullable
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final QueryProcessingPool queryProcessingPool;
@@ -148,6 +154,7 @@ public class TaskToolbox
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider,
+ Provider<DruidProcessingConfig> processingConfigProvider,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
@Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
@@ -191,6 +198,7 @@ public class TaskToolbox
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider =
queryRunnerFactoryConglomerateProvider;
+ this.processingConfigProvider = processingConfigProvider;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
@@ -287,6 +295,11 @@ public class TaskToolbox
return queryRunnerFactoryConglomerateProvider.get();
}
+ public DruidProcessingConfig getProcessingConfig()
+ {
+ return processingConfigProvider.get();
+ }
+
public QueryProcessingPool getQueryProcessingPool()
{
return queryProcessingPool;
@@ -536,6 +549,7 @@ public class TaskToolbox
private DataSegmentServerAnnouncer serverAnnouncer;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider;
+ private Provider<DruidProcessingConfig> processingConfigProvider;
private QueryProcessingPool queryProcessingPool;
private JoinableFactory joinableFactory;
private Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -584,6 +598,7 @@ public class TaskToolbox
this.serverAnnouncer = other.serverAnnouncer;
this.handoffNotifierFactory = other.handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider =
other.queryRunnerFactoryConglomerateProvider;
+ this.processingConfigProvider = other.processingConfigProvider;
this.queryProcessingPool = other.queryProcessingPool;
this.joinableFactory = other.joinableFactory;
this.monitorSchedulerProvider = other.monitorSchedulerProvider;
@@ -690,6 +705,12 @@ public class TaskToolbox
return this;
}
+ public Builder processingConfigProvider(final
Provider<DruidProcessingConfig> processingConfigProvider)
+ {
+ this.processingConfigProvider = processingConfigProvider;
+ return this;
+ }
+
public Builder queryProcessingPool(final QueryProcessingPool
queryProcessingPool)
{
this.queryProcessingPool = queryProcessingPool;
@@ -874,6 +895,7 @@ public class TaskToolbox
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
+ processingConfigProvider,
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 7190c38849f..9084c330655 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -44,6 +44,7 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.StandardRetryPolicy;
@@ -88,6 +89,7 @@ public class TaskToolboxFactory
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider;
+ private final Provider<DruidProcessingConfig> processingConfigProvider;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -133,6 +135,7 @@ public class TaskToolboxFactory
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider,
+ Provider<DruidProcessingConfig> processingConfigProvider,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Provider<MonitorScheduler> monitorSchedulerProvider,
@@ -175,6 +178,7 @@ public class TaskToolboxFactory
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider =
queryRunnerFactoryConglomerateProvider;
+ this.processingConfigProvider = processingConfigProvider;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
@@ -231,6 +235,7 @@ public class TaskToolboxFactory
.serverAnnouncer(serverAnnouncer)
.handoffNotifierFactory(handoffNotifierFactory)
.queryRunnerFactoryConglomerateProvider(queryRunnerFactoryConglomerateProvider)
+ .processingConfigProvider(processingConfigProvider)
.queryProcessingPool(queryProcessingPool)
.joinableFactory(joinableFactory)
.monitorSchedulerProvider(monitorSchedulerProvider)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
new file mode 100644
index 00000000000..f6490579c75
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class SeekableStreamAppenderatorConfig implements AppenderatorConfig
+{
+ private static final Logger log = new
Logger(SeekableStreamAppenderatorConfig.class);
+
+ /**
+ * Rough estimate of the on-heap overhead of reading a column. Determined
empirically through heap dumps.
+ */
+ public static final int APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN =
3_000;
+
+ /**
+ * Rough estimate of the off-heap overhead of reading a column. This is 1.5x
the size of a decompression buffer.
+ * The 1.5x allows for some headroom for unaccounted-for memory usage. The
decompression buffer size is about right
+ * when a compressing {@link IndexSpec} is in play. It is an overestimate
when compression is disabled for
+ * intermediate persists, but this isn't currently the default behavior, so
we're wanting to be conservative here.
+ *
+ * See also {@link AppenderatorConfig#getIndexSpecForIntermediatePersists()}
for how compression can be disabled
+ * for intermediate persists.
+ */
+ public static final int APPENDERATOR_MERGE_ROUGH_DIRECT_MEMORY_PER_COLUMN =
(int) ((1 << 16) * 1.5);
+
+ private final SeekableStreamIndexTaskTuningConfig tuningConfig;
+ private final int maxColumnsToMerge;
+
+ private SeekableStreamAppenderatorConfig(SeekableStreamIndexTaskTuningConfig
tuningConfig, int maxColumnsToMerge)
+ {
+ this.tuningConfig = tuningConfig;
+ this.maxColumnsToMerge = maxColumnsToMerge;
+ }
+
+ public static SeekableStreamAppenderatorConfig fromTuningConfig(
+ final SeekableStreamIndexTaskTuningConfig tuningConfig,
+ @Nullable final DruidProcessingConfig processingConfig
+ )
+ {
+ if (processingConfig == null) {
+ return new SeekableStreamAppenderatorConfig(tuningConfig,
tuningConfig.getMaxColumnsToMerge());
+ } else {
+ final int maxColumnsToMerge;
+ if (tuningConfig.getMaxColumnsToMerge() ==
SeekableStreamIndexTaskTuningConfig.DEFAULT_MAX_COLUMNS_TO_MERGE) {
+ maxColumnsToMerge = calculateDefaultMaxColumnsToMerge(
+ JvmUtils.getRuntimeInfo(),
+ processingConfig,
+ tuningConfig
+ );
+ } else {
+ maxColumnsToMerge = tuningConfig.getMaxColumnsToMerge();
+ }
+
+ return new SeekableStreamAppenderatorConfig(tuningConfig,
maxColumnsToMerge);
+ }
+ }
+
+ @Override
+ public boolean isReportParseExceptions()
+ {
+ return tuningConfig.isReportParseExceptions();
+ }
+
+ @Override
+ public int getMaxPendingPersists()
+ {
+ return tuningConfig.getMaxPendingPersists();
+ }
+
+ @Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return tuningConfig.isSkipBytesInMemoryOverheadCheck();
+ }
+
+ @Override
+ public Period getIntermediatePersistPeriod()
+ {
+ return tuningConfig.getIntermediatePersistPeriod();
+ }
+
+ @Override
+ public File getBasePersistDirectory()
+ {
+ return tuningConfig.getBasePersistDirectory();
+ }
+
+ @Override
+ public AppenderatorConfig withBasePersistDirectory(File basePersistDirectory)
+ {
+ return new SeekableStreamAppenderatorConfig(
+ tuningConfig.withBasePersistDirectory(basePersistDirectory),
+ maxColumnsToMerge
+ );
+ }
+
+ @Override
+ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+ {
+ return tuningConfig.getSegmentWriteOutMediumFactory();
+ }
+
+ @Override
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return tuningConfig.getAppendableIndexSpec();
+ }
+
+ @Override
+ public int getMaxRowsInMemory()
+ {
+ return tuningConfig.getMaxRowsInMemory();
+ }
+
+ @Override
+ public long getMaxBytesInMemory()
+ {
+ return tuningConfig.getMaxBytesInMemory();
+ }
+
+ @Override
+ public PartitionsSpec getPartitionsSpec()
+ {
+ return tuningConfig.getPartitionsSpec();
+ }
+
+ @Override
+ public IndexSpec getIndexSpec()
+ {
+ return tuningConfig.getIndexSpec();
+ }
+
+ @Override
+ public IndexSpec getIndexSpecForIntermediatePersists()
+ {
+ return tuningConfig.getIndexSpecForIntermediatePersists();
+ }
+
+ @Override
+ public int getNumPersistThreads()
+ {
+ return tuningConfig.getNumPersistThreads();
+ }
+
+ @Override
+ public Integer getMaxRowsPerSegment()
+ {
+ return tuningConfig.getMaxRowsPerSegment();
+ }
+
+ @Override
+ public Long getMaxTotalRows()
+ {
+ return tuningConfig.getMaxTotalRows();
+ }
+
+ @Override
+ public int getMaxColumnsToMerge()
+ {
+ return maxColumnsToMerge;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SeekableStreamAppenderatorConfig that = (SeekableStreamAppenderatorConfig)
o;
+ return maxColumnsToMerge == that.maxColumnsToMerge &&
Objects.equals(tuningConfig, that.tuningConfig);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tuningConfig, maxColumnsToMerge);
+ }
+
+ /**
+ * Calculates the value to use for "maxColumnsToMerge" when it has been set
to
+ * {@link SeekableStreamIndexTaskTuningConfig#DEFAULT_MAX_COLUMNS_TO_MERGE}.
+ */
+ static int calculateDefaultMaxColumnsToMerge(
+ final RuntimeInfo runtimeInfo,
+ final DruidProcessingConfig processingConfig,
+ final TuningConfig tuningConfig
+ )
+ {
+ final int maxColumnsToMergeBasedOnHeapMemory =
getMaxColumnsToMergeBasedOnHeapMemory(tuningConfig);
+ final int maxColumnsToMergeBasedOnDirectMemory =
+ getMaxColumnsToMergeBasedOnDirectMemory(runtimeInfo, processingConfig);
+
+ return Math.min(maxColumnsToMergeBasedOnHeapMemory,
maxColumnsToMergeBasedOnDirectMemory);
+ }
+
+ /**
+ * Calculates the value to use for "maxColumnsToMerge" based purely on heap
memory.
+ */
+ static int getMaxColumnsToMergeBasedOnHeapMemory(TuningConfig tuningConfig)
+ {
+ final long heapMemoryForMerging;
+
+ if (tuningConfig.getMaxBytesInMemory() >= 0) {
+ // maxBytesInMemory is active and represents the amount of heap memory
used for indexing.
+ // Use an equal amount of heap memory for merging.
+ heapMemoryForMerging = tuningConfig.getMaxBytesInMemoryOrDefault();
+ } else {
+ // maxBytesInMemory was set to unlimited by the user; use what that
_would_ be if it was set to automatic.
+ heapMemoryForMerging =
tuningConfig.getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+ }
+
+ return (int) Math.min(heapMemoryForMerging /
APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Calculates the value to use for "maxColumnsToMerge" based purely on
direct memory.
+ */
+ static int getMaxColumnsToMergeBasedOnDirectMemory(
+ final RuntimeInfo runtimeInfo,
+ final DruidProcessingConfig processingConfig
+ )
+ {
+ // Queries requires one processing buffer per thread, plus all the merge
buffers.
+ final long directMemoryForQueries =
+ (long) processingConfig.intermediateComputeSizeBytes() *
+ (processingConfig.getNumThreads() +
processingConfig.getNumMergeBuffers());
+
+ long directMemorySizeBytes;
+
+ try {
+ directMemorySizeBytes = runtimeInfo.getDirectMemorySizeBytes();
+ }
+ catch (UnsupportedOperationException e) {
+ // Cannot find direct memory, assume it was configured according to our
guidelines (at least big enough to
+ // hold one extra processing buffer).
+ directMemorySizeBytes = directMemoryForQueries +
processingConfig.intermediateComputeSizeBytes();
+
+ log.noStackTrace().warn(
+ e,
+ "Ignoring direct memory when sizing maxColumnsToMerge; cannot
retrieve. "
+ + "Assuming direct memory is at least [%,d] bytes.",
+ directMemorySizeBytes
+ );
+ }
+
+ return (int) Math.min(
+ (directMemorySizeBytes - directMemoryForQueries) /
APPENDERATOR_MERGE_ROUGH_DIRECT_MEMORY_PER_COLUMN,
+ Integer.MAX_VALUE
+ );
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 1abff409229..2550408ddc0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -41,7 +41,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -62,7 +61,6 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
- private static final EmittingLogger log = new
EmittingLogger(SeekableStreamIndexTask.class);
protected final DataSchema dataSchema;
protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
@@ -195,7 +193,10 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
toolbox.getSegmentLoaderConfig(),
getId(),
dataSchema,
- tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+ SeekableStreamAppenderatorConfig.fromTuningConfig(
+ tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+ toolbox.getProcessingConfig()
+ ),
metrics,
toolbox.getSegmentPusher(),
toolbox.getJsonMapper(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 309336e1d53..124682bcb2e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -33,15 +33,15 @@ import java.io.File;
import java.time.Duration;
import java.util.Objects;
-public abstract class SeekableStreamIndexTaskTuningConfig implements
AppenderatorConfig
+public abstract class SeekableStreamIndexTaskTuningConfig implements
TuningConfig
{
+ public static final int DEFAULT_MAX_COLUMNS_TO_MERGE = -1;
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK
= false;
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new
Period("PT10M");
private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT =
Duration.ofMinutes(15).toMillis();
- private static final int DEFAULT_MAX_COLUMNS_TO_MERGE = -1;
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
@@ -140,8 +140,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
this.logParseExceptions = logParseExceptions == null
? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
: logParseExceptions;
- this.numPersistThreads = numPersistThreads == null ?
- DEFAULT_NUM_PERSIST_THREADS :
Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS);
+ if (numPersistThreads == null) {
+ this.numPersistThreads = AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS;
+ } else {
+ this.numPersistThreads = Math.max(numPersistThreads,
AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS);
+ }
this.maxColumnsToMerge = maxColumnsToMerge == null ?
DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge;
}
@@ -167,13 +170,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
}
@JsonProperty
- @Override
public boolean isSkipBytesInMemoryOverheadCheck()
{
return skipBytesInMemoryOverheadCheck;
}
- @Override
@JsonProperty
public Integer getMaxRowsPerSegment()
{
@@ -181,7 +182,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
}
@JsonProperty
- @Override
@Nullable
public Long getMaxTotalRows()
{
@@ -194,20 +194,17 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return partitionsSpec;
}
- @Override
@JsonProperty
public Period getIntermediatePersistPeriod()
{
return intermediatePersistPeriod;
}
- @Override
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
- @Override
@JsonProperty
@Deprecated
public int getMaxPendingPersists()
@@ -229,7 +226,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return indexSpecForIntermediatePersists;
}
- @Override
@JsonProperty
public boolean isReportParseExceptions()
{
@@ -248,7 +244,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return resetOffsetAutomatically;
}
- @Override
@JsonProperty
@Nullable
public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
@@ -286,21 +281,18 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return skipSequenceNumberAvailabilityCheck;
}
- @Override
@JsonProperty
public int getNumPersistThreads()
{
return numPersistThreads;
}
- @Override
@JsonProperty
public int getMaxColumnsToMerge()
{
return maxColumnsToMerge;
}
- @Override
public abstract SeekableStreamIndexTaskTuningConfig
withBasePersistDirectory(File dir);
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 25aa3c4a089..8f00f7961ff 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -35,6 +35,7 @@ import
org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -130,6 +131,7 @@ public class TaskToolboxTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory,
() -> mockQueryRunnerFactoryConglomerate,
+ DruidProcessingConfig::new,
mockQueryProcessingPool,
NoopJoinableFactory.INSTANCE,
() -> mockMonitorScheduler,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 5590ce608ec..e5be345fb9e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.scan.ScanResultValue;
@@ -111,6 +112,7 @@ public class SingleTaskBackgroundRunnerTest
null,
null,
null,
+ DruidProcessingConfig::new,
null,
NoopJoinableFactory.INSTANCE,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index e6535fdadc9..75bdd55d395 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -106,6 +106,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@@ -586,6 +587,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory
conglomerate corporation unionized collective
+ DruidProcessingConfig::new,
DirectQueryProcessingPool.INSTANCE, // query executor service
NoopJoinableFactory.INSTANCE,
() -> monitorScheduler, // monitor scheduler
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
index 3f3081b0e69..b56da16de0c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -41,6 +41,7 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
@@ -92,6 +93,7 @@ public class TestTaskToolboxFactory extends TaskToolboxFactory
bob.serverAnnouncer,
bob.handoffNotifierFactory,
bob.queryRunnerFactoryConglomerateProvider,
+ bob.processingConfigProvider,
bob.queryProcessingPool,
bob.joinableFactory,
bob.monitorSchedulerProvider,
@@ -136,6 +138,7 @@ public class TestTaskToolboxFactory extends
TaskToolboxFactory
private DataSegmentServerAnnouncer serverAnnouncer;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private Provider<QueryRunnerFactoryConglomerate>
queryRunnerFactoryConglomerateProvider;
+ private Provider<DruidProcessingConfig> processingConfigProvider;
private QueryProcessingPool queryProcessingPool;
private JoinableFactory joinableFactory;
private Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -236,6 +239,12 @@ public class TestTaskToolboxFactory extends
TaskToolboxFactory
return this;
}
+ public Builder setProcessingConfigProvider(Provider<DruidProcessingConfig>
processingConfigProvider)
+ {
+ this.processingConfigProvider = processingConfigProvider;
+ return this;
+ }
+
public Builder setQueryProcessingPool(QueryProcessingPool
queryProcessingPool)
{
this.queryProcessingPool = queryProcessingPool;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
new file mode 100644
index 00000000000..2008ee09360
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.query.DruidProcessingBufferConfig;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.AppendableIndexBuilder;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.utils.RuntimeInfo;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class SeekableStreamAppenderatorConfigTest
+{
+ @Test
+ public void test_fromTuningConfig()
+ {
+ final SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+ Mockito.when(tuningConfig.isReportParseExceptions()).thenReturn(true);
+ Mockito.when(tuningConfig.getMaxPendingPersists()).thenReturn(2);
+
Mockito.when(tuningConfig.isSkipBytesInMemoryOverheadCheck()).thenReturn(true);
+
Mockito.when(tuningConfig.getIntermediatePersistPeriod()).thenReturn(Period.days(3));
+ Mockito.when(tuningConfig.getBasePersistDirectory()).thenReturn(new
File("/nonexistent/tmp"));
+ Mockito.when(tuningConfig.getMaxRowsInMemory()).thenReturn(11);
+ Mockito.when(tuningConfig.getMaxBytesInMemory()).thenReturn(12L);
+ Mockito.when(tuningConfig.getIndexSpec())
+
.thenReturn(IndexSpec.builder().withMetricCompression(CompressionStrategy.NONE).build());
+ Mockito.when(tuningConfig.getIndexSpecForIntermediatePersists())
+
.thenReturn(IndexSpec.builder().withMetricCompression(CompressionStrategy.NONE).build());
+ Mockito.when(tuningConfig.getNumPersistThreads()).thenReturn(13);
+ Mockito.when(tuningConfig.getMaxRowsPerSegment()).thenReturn(14);
+ Mockito.when(tuningConfig.getMaxTotalRows()).thenReturn(15L);
+ Mockito.when(tuningConfig.getMaxColumnsToMerge()).thenReturn(16);
+
Mockito.when(tuningConfig.isSkipBytesInMemoryOverheadCheck()).thenReturn(true);
+ Mockito.when(tuningConfig.getMaxColumnsToMerge()).thenReturn(123);
+
+ final SeekableStreamAppenderatorConfig appenderatorConfig =
+ SeekableStreamAppenderatorConfig.fromTuningConfig(tuningConfig, null);
+
+ Assert.assertEquals(tuningConfig.isReportParseExceptions(),
appenderatorConfig.isReportParseExceptions());
+ Assert.assertEquals(tuningConfig.getMaxPendingPersists(),
appenderatorConfig.getMaxPendingPersists());
+ Assert.assertEquals(
+ tuningConfig.isSkipBytesInMemoryOverheadCheck(),
+ appenderatorConfig.isSkipBytesInMemoryOverheadCheck()
+ );
+ Assert.assertEquals(tuningConfig.getIntermediatePersistPeriod(),
appenderatorConfig.getIntermediatePersistPeriod());
+ Assert.assertEquals(tuningConfig.getBasePersistDirectory(),
appenderatorConfig.getBasePersistDirectory());
+ Assert.assertEquals(tuningConfig.getMaxRowsInMemory(),
appenderatorConfig.getMaxRowsInMemory());
+ Assert.assertEquals(tuningConfig.getMaxBytesInMemory(),
appenderatorConfig.getMaxBytesInMemory());
+ Assert.assertEquals(tuningConfig.getIndexSpec(),
appenderatorConfig.getIndexSpec());
+ Assert.assertEquals(
+ tuningConfig.getIndexSpecForIntermediatePersists(),
+ appenderatorConfig.getIndexSpecForIntermediatePersists()
+ );
+ Assert.assertEquals(tuningConfig.getNumPersistThreads(),
appenderatorConfig.getNumPersistThreads());
+ Assert.assertEquals(tuningConfig.getMaxRowsPerSegment(),
appenderatorConfig.getMaxRowsPerSegment());
+ Assert.assertEquals(tuningConfig.getMaxTotalRows(),
appenderatorConfig.getMaxTotalRows());
+ Assert.assertEquals(tuningConfig.getMaxColumnsToMerge(),
appenderatorConfig.getMaxColumnsToMerge());
+ Assert.assertEquals(
+ tuningConfig.isSkipBytesInMemoryOverheadCheck(),
+ appenderatorConfig.isSkipBytesInMemoryOverheadCheck()
+ );
+ Assert.assertEquals(tuningConfig.getMaxColumnsToMerge(),
appenderatorConfig.getMaxColumnsToMerge());
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytesAuto_2proc_1merge()
+ {
+ Assert.assertEquals(
+ 17293,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L,
1_000_000_000L),
+ new MockProcessingConfig(2, 1, 100_000_000),
+ new MockTuningConfig(0, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytesUnlimited_2proc_1merge()
+ {
+ Assert.assertEquals(
+ 17293,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L,
1_000_000_000L),
+ new MockProcessingConfig(2, 1, 100_000_000),
+ new MockTuningConfig(-1, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_direct1800m_xmx1g_maxBytesUnlimited_2proc_1merge()
+ {
+ Assert.assertEquals(
+ 15258,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new DruidProcessingConfigTest.MockRuntimeInfo(2, 1_800_000_000L,
1_000_000_000L),
+ new MockProcessingConfig(2, 1, 100_000_000),
+ new MockTuningConfig(-1, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_direct1800m_xmx1g_maxBytesUnlimited_3proc_2merge()
+ {
+ Assert.assertEquals(
+ 13224,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new DruidProcessingConfigTest.MockRuntimeInfo(3, 1_800_000_000L,
1_000_000_000L),
+ new MockProcessingConfig(3, 2, 100_000_000),
+ new MockTuningConfig(-1, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytes20m_2proc_1merge()
+ {
+ Assert.assertEquals(
+ 6666,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L,
1_000_000_000L),
+ new MockProcessingConfig(2, 1, 100_000_000),
+ new MockTuningConfig(20_000_000L, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void
test_calculateDefaultMaxColumnsToMerge_directUnsupported_xmx1g_maxBytes20m_2proc_1merge()
+ {
+ Assert.assertEquals(
+ 1017,
+ SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+ new RuntimeInfo()
+ {
+ @Override
+ public long getDirectMemorySizeBytes()
+ {
+ throw new UnsupportedOperationException();
+ }
+ },
+ new MockProcessingConfig(2, 1, 100_000_000),
+ new MockTuningConfig(20_000_000L, 500_000_000L)
+ )
+ );
+ }
+
+ @Test
+ public void test_equals()
+ {
+ EqualsVerifier.forClass(SeekableStreamAppenderatorConfig.class)
+ .usingGetClass()
+ .verify();
+ }
+
+ private static class MockProcessingConfig extends DruidProcessingConfig
+ {
+ public MockProcessingConfig(final int numThreads, final int
numMergeBuffers, final int bufferSize)
+ {
+ super(
+ null,
+ numThreads,
+ numMergeBuffers,
+ null,
+ null,
+ new
DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
+ null
+ );
+ }
+ }
+
+ private static class MockTuningConfig implements TuningConfig
+ {
+ private final long configuredMaxBytesInMemory;
+ private final long defaultMaxBytesInMemory;
+
+ public MockTuningConfig(long configuredMaxBytesInMemory, long
defaultMaxBytesInMemory)
+ {
+ this.configuredMaxBytesInMemory = configuredMaxBytesInMemory;
+ this.defaultMaxBytesInMemory = defaultMaxBytesInMemory;
+ }
+
+ @Override
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return new AppendableIndexSpec()
+ {
+ @Override
+ public AppendableIndexBuilder builder()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getDefaultMaxBytesInMemory()
+ {
+ return defaultMaxBytesInMemory;
+ }
+ };
+ }
+
+ @Override
+ public int getMaxRowsInMemory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxBytesInMemory()
+ {
+ return configuredMaxBytesInMemory;
+ }
+
+ @Override
+ public PartitionsSpec getPartitionsSpec()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IndexSpec getIndexSpec()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IndexSpec getIndexSpecForIntermediatePersists()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 9e82196f811..8c0bdf56ed5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -91,6 +91,7 @@ import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -683,6 +684,7 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeQueryRunnerConglomerate,
+ DruidProcessingConfig::new,
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 37839f8e077..930e2fc7d7c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -143,6 +143,7 @@ public class WorkerTaskManagerTest
notifierFactory,
null,
null,
+ null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, jsonMapper),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index c8a75462535..8e6577a86e8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -185,6 +185,7 @@ public class WorkerTaskMonitorTest
notifierFactory,
null,
null,
+ null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, jsonMapper),
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 658242a6ed7..4afd7afdc0e 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -1204,12 +1204,18 @@ public class IndexMergerV9 implements IndexMerger
List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes,
maxColumnsToMerge);
List<File> currentOutputs = new ArrayList<>();
- log.debug("base outDir: " + outDir);
+ log.debug("Base outDir[%s]", outDir);
try {
int tierCounter = 0;
while (true) {
- log.info("Merging %d phases, tiers finished processed so far: %d.",
currentPhases.size(), tierCounter);
+ log.info(
+ "Merging phases[%,d] (indexes[%,d], maxColumnsToMerge[%,d]), tiers
finished processed so far[%,d].",
+ currentPhases.size(),
+ indexes.size(),
+ maxColumnsToMerge,
+ tierCounter
+ );
for (List<IndexableAdapter> phase : currentPhases) {
final File phaseOutDir;
final boolean isFinalPhase = currentPhases.size() == 1;
@@ -1221,8 +1227,8 @@ public class IndexMergerV9 implements IndexMerger
phaseOutDir = FileUtils.createTempDir();
tempDirs.add(phaseOutDir);
}
- log.info("Merging phase with %d indexes.", phase.size());
- log.debug("phase outDir: " + phaseOutDir);
+ log.info("Merging phase with index count[%,d].", phase.size());
+ log.debug("Phase outDir[%s]", phaseOutDir);
File phaseOutput = merge(
phase,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index ab1b09b772c..7e5f1b3f25f 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.appenderator;
+import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@@ -74,6 +75,6 @@ public interface AppenderatorConfig extends TuningConfig
default int getMaxColumnsToMerge()
{
- return -1;
+ return IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]