This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3766e3a14fb Automatically determine maxColumnsToMerge for streaming 
ingestion. (#17917)
3766e3a14fb is described below

commit 3766e3a14fb49ca98ab3b02495bf1336c7eee75c
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 16 18:59:24 2025 -0700

    Automatically determine maxColumnsToMerge for streaming ingestion. (#17917)
    
    * Automatically determine maxColumnsToMerge for streaming ingestion.
    
    Borrowing from the algorithm used by MSQ, SeekableStreamIndexTask now
    also automatically determines maxColumnsToMerge when it is set to -1
    (the default). The algorithm uses the minimum of:
    
    - A figure derived from heap memory, assuming an equal amount of memory
      is devoted to indexing as to merging. (This is also assumed when
      computing the setting of maxBytesInMemory.)
    - A figure derived from offheap memory, assuming we can use what isn't
      used for processing buffers.
    
    * Add test coverage.
    
    * Reword.
---
 .../druid/msq/exec/WorkerMemoryParameters.java     |  10 +-
 .../apache/druid/indexing/common/TaskToolbox.java  |  22 ++
 .../druid/indexing/common/TaskToolboxFactory.java  |   5 +
 .../SeekableStreamAppenderatorConfig.java          | 283 +++++++++++++++++++++
 .../seekablestream/SeekableStreamIndexTask.java    |   7 +-
 .../SeekableStreamIndexTaskTuningConfig.java       |  22 +-
 .../druid/indexing/common/TaskToolboxTest.java     |   2 +
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   2 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +
 .../indexing/overlord/TestTaskToolboxFactory.java  |   9 +
 .../SeekableStreamAppenderatorConfigTest.java      | 263 +++++++++++++++++++
 .../SeekableStreamIndexTaskTestBase.java           |   2 +
 .../indexing/worker/WorkerTaskManagerTest.java     |   1 +
 .../indexing/worker/WorkerTaskMonitorTest.java     |   1 +
 .../org/apache/druid/segment/IndexMergerV9.java    |  14 +-
 .../realtime/appenderator/AppenderatorConfig.java  |   3 +-
 16 files changed, 619 insertions(+), 29 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index d7c9b9a1073..4eccd5fe317 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.processor.SuperSorter;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamAppenderatorConfig;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
@@ -80,11 +81,6 @@ public class WorkerMemoryParameters
    */
   private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
-  /**
-   * (Very) rough estimate of the on-heap overhead of reading a column.
-   */
-  private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
-
   /**
    * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
    * {@link ClusterByStatisticsCollectorImpl}.
@@ -305,7 +301,9 @@ public class WorkerMemoryParameters
   public int getAppenderatorMaxColumnsToMerge()
   {
     // Half for indexing, half for merging.
-    return Ints.checkedCast(Math.max(2, getAppenderatorMemory() / 2 / 
APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN));
+    final long calculatedMaxColumnsToMerge =
+        getAppenderatorMemory() / 2 / 
SeekableStreamAppenderatorConfig.APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN;
+    return Ints.checkedCast(Math.max(2, calculatedMaxColumnsToMerge));
   }
 
   public int getFrameSize()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 9732d72eaf5..041ab12e790 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.rpc.indexing.OverlordClient;
@@ -99,6 +100,11 @@ public class TaskToolbox
    * because it may be unavailable, e. g. for batch tasks running in Spark or 
Hadoop.
    */
   private final Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider;
+  /**
+   * Using Provider, not {@link DruidProcessingConfig} directly, because it 
may not be available for task
+   * types that do not run queries.
+   */
+  private final Provider<DruidProcessingConfig> processingConfigProvider;
   @Nullable
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
   private final QueryProcessingPool queryProcessingPool;
@@ -148,6 +154,7 @@ public class TaskToolbox
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentHandoffNotifierFactory handoffNotifierFactory,
       Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider,
+      Provider<DruidProcessingConfig> processingConfigProvider,
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       @Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
@@ -191,6 +198,7 @@ public class TaskToolbox
     this.serverAnnouncer = serverAnnouncer;
     this.handoffNotifierFactory = handoffNotifierFactory;
     this.queryRunnerFactoryConglomerateProvider = 
queryRunnerFactoryConglomerateProvider;
+    this.processingConfigProvider = processingConfigProvider;
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
@@ -287,6 +295,11 @@ public class TaskToolbox
     return queryRunnerFactoryConglomerateProvider.get();
   }
 
+  public DruidProcessingConfig getProcessingConfig()
+  {
+    return processingConfigProvider.get();
+  }
+
   public QueryProcessingPool getQueryProcessingPool()
   {
     return queryProcessingPool;
@@ -536,6 +549,7 @@ public class TaskToolbox
     private DataSegmentServerAnnouncer serverAnnouncer;
     private SegmentHandoffNotifierFactory handoffNotifierFactory;
     private Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider;
+    private Provider<DruidProcessingConfig> processingConfigProvider;
     private QueryProcessingPool queryProcessingPool;
     private JoinableFactory joinableFactory;
     private Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -584,6 +598,7 @@ public class TaskToolbox
       this.serverAnnouncer = other.serverAnnouncer;
       this.handoffNotifierFactory = other.handoffNotifierFactory;
       this.queryRunnerFactoryConglomerateProvider = 
other.queryRunnerFactoryConglomerateProvider;
+      this.processingConfigProvider = other.processingConfigProvider;
       this.queryProcessingPool = other.queryProcessingPool;
       this.joinableFactory = other.joinableFactory;
       this.monitorSchedulerProvider = other.monitorSchedulerProvider;
@@ -690,6 +705,12 @@ public class TaskToolbox
       return this;
     }
 
+    public Builder processingConfigProvider(final 
Provider<DruidProcessingConfig> processingConfigProvider)
+    {
+      this.processingConfigProvider = processingConfigProvider;
+      return this;
+    }
+
     public Builder queryProcessingPool(final QueryProcessingPool 
queryProcessingPool)
     {
       this.queryProcessingPool = queryProcessingPool;
@@ -874,6 +895,7 @@ public class TaskToolbox
           serverAnnouncer,
           handoffNotifierFactory,
           queryRunnerFactoryConglomerateProvider,
+          processingConfigProvider,
           queryProcessingPool,
           joinableFactory,
           monitorSchedulerProvider,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 7190c38849f..9084c330655 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -44,6 +44,7 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.rpc.StandardRetryPolicy;
@@ -88,6 +89,7 @@ public class TaskToolboxFactory
   private final DataSegmentServerAnnouncer serverAnnouncer;
   private final SegmentHandoffNotifierFactory handoffNotifierFactory;
   private final Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider;
+  private final Provider<DruidProcessingConfig> processingConfigProvider;
   private final QueryProcessingPool queryProcessingPool;
   private final JoinableFactory joinableFactory;
   private final Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -133,6 +135,7 @@ public class TaskToolboxFactory
       DataSegmentServerAnnouncer serverAnnouncer,
       SegmentHandoffNotifierFactory handoffNotifierFactory,
       Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider,
+      Provider<DruidProcessingConfig> processingConfigProvider,
       QueryProcessingPool queryProcessingPool,
       JoinableFactory joinableFactory,
       Provider<MonitorScheduler> monitorSchedulerProvider,
@@ -175,6 +178,7 @@ public class TaskToolboxFactory
     this.serverAnnouncer = serverAnnouncer;
     this.handoffNotifierFactory = handoffNotifierFactory;
     this.queryRunnerFactoryConglomerateProvider = 
queryRunnerFactoryConglomerateProvider;
+    this.processingConfigProvider = processingConfigProvider;
     this.queryProcessingPool = queryProcessingPool;
     this.joinableFactory = joinableFactory;
     this.monitorSchedulerProvider = monitorSchedulerProvider;
@@ -231,6 +235,7 @@ public class TaskToolboxFactory
         .serverAnnouncer(serverAnnouncer)
         .handoffNotifierFactory(handoffNotifierFactory)
         
.queryRunnerFactoryConglomerateProvider(queryRunnerFactoryConglomerateProvider)
+        .processingConfigProvider(processingConfigProvider)
         .queryProcessingPool(queryProcessingPool)
         .joinableFactory(joinableFactory)
         .monitorSchedulerProvider(monitorSchedulerProvider)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
new file mode 100644
index 00000000000..f6490579c75
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.utils.RuntimeInfo;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class SeekableStreamAppenderatorConfig implements AppenderatorConfig
+{
+  private static final Logger log = new 
Logger(SeekableStreamAppenderatorConfig.class);
+
+  /**
+   * Rough estimate of the on-heap overhead of reading a column. Determined 
empirically through heap dumps.
+   */
+  public static final int APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN = 
3_000;
+
+  /**
+   * Rough estimate of the off-heap overhead of reading a column. This is 1.5x 
the size of a decompression buffer.
+   * The 1.5x allows for some headroom for unaccounted-for memory usage. The 
decompression buffer size is about right
+   * when a compressing {@link IndexSpec} is in play. It is an overestimate 
when compression is disabled for
+   * intermediate persists, but this isn't currently the default behavior, so 
we're wanting to be conservative here.
+   *
+   * See also {@link AppenderatorConfig#getIndexSpecForIntermediatePersists()} 
for how compression can be disabled
+   * for intermediate persists.
+   */
+  public static final int APPENDERATOR_MERGE_ROUGH_DIRECT_MEMORY_PER_COLUMN = 
(int) ((1 << 16) * 1.5);
+
+  private final SeekableStreamIndexTaskTuningConfig tuningConfig;
+  private final int maxColumnsToMerge;
+
+  private SeekableStreamAppenderatorConfig(SeekableStreamIndexTaskTuningConfig 
tuningConfig, int maxColumnsToMerge)
+  {
+    this.tuningConfig = tuningConfig;
+    this.maxColumnsToMerge = maxColumnsToMerge;
+  }
+
+  public static SeekableStreamAppenderatorConfig fromTuningConfig(
+      final SeekableStreamIndexTaskTuningConfig tuningConfig,
+      @Nullable final DruidProcessingConfig processingConfig
+  )
+  {
+    if (processingConfig == null) {
+      return new SeekableStreamAppenderatorConfig(tuningConfig, 
tuningConfig.getMaxColumnsToMerge());
+    } else {
+      final int maxColumnsToMerge;
+      if (tuningConfig.getMaxColumnsToMerge() == 
SeekableStreamIndexTaskTuningConfig.DEFAULT_MAX_COLUMNS_TO_MERGE) {
+        maxColumnsToMerge = calculateDefaultMaxColumnsToMerge(
+            JvmUtils.getRuntimeInfo(),
+            processingConfig,
+            tuningConfig
+        );
+      } else {
+        maxColumnsToMerge = tuningConfig.getMaxColumnsToMerge();
+      }
+
+      return new SeekableStreamAppenderatorConfig(tuningConfig, 
maxColumnsToMerge);
+    }
+  }
+
+  @Override
+  public boolean isReportParseExceptions()
+  {
+    return tuningConfig.isReportParseExceptions();
+  }
+
+  @Override
+  public int getMaxPendingPersists()
+  {
+    return tuningConfig.getMaxPendingPersists();
+  }
+
+  @Override
+  public boolean isSkipBytesInMemoryOverheadCheck()
+  {
+    return tuningConfig.isSkipBytesInMemoryOverheadCheck();
+  }
+
+  @Override
+  public Period getIntermediatePersistPeriod()
+  {
+    return tuningConfig.getIntermediatePersistPeriod();
+  }
+
+  @Override
+  public File getBasePersistDirectory()
+  {
+    return tuningConfig.getBasePersistDirectory();
+  }
+
+  @Override
+  public AppenderatorConfig withBasePersistDirectory(File basePersistDirectory)
+  {
+    return new SeekableStreamAppenderatorConfig(
+        tuningConfig.withBasePersistDirectory(basePersistDirectory),
+        maxColumnsToMerge
+    );
+  }
+
+  @Override
+  public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+  {
+    return tuningConfig.getSegmentWriteOutMediumFactory();
+  }
+
+  @Override
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return tuningConfig.getAppendableIndexSpec();
+  }
+
+  @Override
+  public int getMaxRowsInMemory()
+  {
+    return tuningConfig.getMaxRowsInMemory();
+  }
+
+  @Override
+  public long getMaxBytesInMemory()
+  {
+    return tuningConfig.getMaxBytesInMemory();
+  }
+
+  @Override
+  public PartitionsSpec getPartitionsSpec()
+  {
+    return tuningConfig.getPartitionsSpec();
+  }
+
+  @Override
+  public IndexSpec getIndexSpec()
+  {
+    return tuningConfig.getIndexSpec();
+  }
+
+  @Override
+  public IndexSpec getIndexSpecForIntermediatePersists()
+  {
+    return tuningConfig.getIndexSpecForIntermediatePersists();
+  }
+
+  @Override
+  public int getNumPersistThreads()
+  {
+    return tuningConfig.getNumPersistThreads();
+  }
+
+  @Override
+  public Integer getMaxRowsPerSegment()
+  {
+    return tuningConfig.getMaxRowsPerSegment();
+  }
+
+  @Override
+  public Long getMaxTotalRows()
+  {
+    return tuningConfig.getMaxTotalRows();
+  }
+
+  @Override
+  public int getMaxColumnsToMerge()
+  {
+    return maxColumnsToMerge;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SeekableStreamAppenderatorConfig that = (SeekableStreamAppenderatorConfig) 
o;
+    return maxColumnsToMerge == that.maxColumnsToMerge && 
Objects.equals(tuningConfig, that.tuningConfig);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(tuningConfig, maxColumnsToMerge);
+  }
+
+  /**
+   * Calculates the value to use for "maxColumnsToMerge" when it has been set 
to
+   * {@link SeekableStreamIndexTaskTuningConfig#DEFAULT_MAX_COLUMNS_TO_MERGE}.
+   */
+  static int calculateDefaultMaxColumnsToMerge(
+      final RuntimeInfo runtimeInfo,
+      final DruidProcessingConfig processingConfig,
+      final TuningConfig tuningConfig
+  )
+  {
+    final int maxColumnsToMergeBasedOnHeapMemory = 
getMaxColumnsToMergeBasedOnHeapMemory(tuningConfig);
+    final int maxColumnsToMergeBasedOnDirectMemory =
+        getMaxColumnsToMergeBasedOnDirectMemory(runtimeInfo, processingConfig);
+
+    return Math.min(maxColumnsToMergeBasedOnHeapMemory, 
maxColumnsToMergeBasedOnDirectMemory);
+  }
+
+  /**
+   * Calculates the value to use for "maxColumnsToMerge" based purely on heap 
memory.
+   */
+  static int getMaxColumnsToMergeBasedOnHeapMemory(TuningConfig tuningConfig)
+  {
+    final long heapMemoryForMerging;
+
+    if (tuningConfig.getMaxBytesInMemory() >= 0) {
+      // maxBytesInMemory is active and represents the amount of heap memory 
used for indexing.
+      // Use an equal amount of heap memory for merging.
+      heapMemoryForMerging = tuningConfig.getMaxBytesInMemoryOrDefault();
+    } else {
+      // maxBytesInMemory was set to unlimited by the user; use what that 
_would_ be if it was set to automatic.
+      heapMemoryForMerging = 
tuningConfig.getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    }
+
+    return (int) Math.min(heapMemoryForMerging / 
APPENDERATOR_MERGE_ROUGH_HEAP_MEMORY_PER_COLUMN, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Calculates the value to use for "maxColumnsToMerge" based purely on 
direct memory.
+   */
+  static int getMaxColumnsToMergeBasedOnDirectMemory(
+      final RuntimeInfo runtimeInfo,
+      final DruidProcessingConfig processingConfig
+  )
+  {
+    // Queries requires one processing buffer per thread, plus all the merge 
buffers.
+    final long directMemoryForQueries =
+        (long) processingConfig.intermediateComputeSizeBytes() *
+        (processingConfig.getNumThreads() + 
processingConfig.getNumMergeBuffers());
+
+    long directMemorySizeBytes;
+
+    try {
+      directMemorySizeBytes = runtimeInfo.getDirectMemorySizeBytes();
+    }
+    catch (UnsupportedOperationException e) {
+      // Cannot find direct memory, assume it was configured according to our 
guidelines (at least big enough to
+      // hold one extra processing buffer).
+      directMemorySizeBytes = directMemoryForQueries + 
processingConfig.intermediateComputeSizeBytes();
+
+      log.noStackTrace().warn(
+          e,
+          "Ignoring direct memory when sizing maxColumnsToMerge; cannot 
retrieve. "
+          + "Assuming direct memory is at least [%,d] bytes.",
+          directMemorySizeBytes
+      );
+    }
+
+    return (int) Math.min(
+        (directMemorySizeBytes - directMemoryForQueries) / 
APPENDERATOR_MERGE_ROUGH_DIRECT_MEMORY_PER_COLUMN,
+        Integer.MAX_VALUE
+    );
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 1abff409229..2550408ddc0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -41,7 +41,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.NoopQueryRunner;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -62,7 +61,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask
 {
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
-  private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamIndexTask.class);
 
   protected final DataSchema dataSchema;
   protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
@@ -195,7 +193,10 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
         toolbox.getSegmentLoaderConfig(),
         getId(),
         dataSchema,
-        tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+        SeekableStreamAppenderatorConfig.fromTuningConfig(
+            tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+            toolbox.getProcessingConfig()
+        ),
         metrics,
         toolbox.getSegmentPusher(),
         toolbox.getJsonMapper(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 309336e1d53..124682bcb2e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -33,15 +33,15 @@ import java.io.File;
 import java.time.Duration;
 import java.util.Objects;
 
-public abstract class SeekableStreamIndexTaskTuningConfig implements 
AppenderatorConfig
+public abstract class SeekableStreamIndexTaskTuningConfig implements 
TuningConfig
 {
+  public static final int DEFAULT_MAX_COLUMNS_TO_MERGE = -1;
   private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
   private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK 
= false;
   private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new 
Period("PT10M");
   private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
   private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
   private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 
Duration.ofMinutes(15).toMillis();
-  private static final int DEFAULT_MAX_COLUMNS_TO_MERGE = -1;
 
   private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
@@ -140,8 +140,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     this.logParseExceptions = logParseExceptions == null
                               ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
                               : logParseExceptions;
-    this.numPersistThreads = numPersistThreads == null ?
-                                DEFAULT_NUM_PERSIST_THREADS : 
Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS);
+    if (numPersistThreads == null) {
+      this.numPersistThreads = AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS;
+    } else {
+      this.numPersistThreads = Math.max(numPersistThreads, 
AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS);
+    }
     this.maxColumnsToMerge = maxColumnsToMerge == null ? 
DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge;
   }
 
@@ -167,13 +170,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
   }
 
   @JsonProperty
-  @Override
   public boolean isSkipBytesInMemoryOverheadCheck()
   {
     return skipBytesInMemoryOverheadCheck;
   }
 
-  @Override
   @JsonProperty
   public Integer getMaxRowsPerSegment()
   {
@@ -181,7 +182,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
   }
 
   @JsonProperty
-  @Override
   @Nullable
   public Long getMaxTotalRows()
   {
@@ -194,20 +194,17 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     return partitionsSpec;
   }
 
-  @Override
   @JsonProperty
   public Period getIntermediatePersistPeriod()
   {
     return intermediatePersistPeriod;
   }
 
-  @Override
   public File getBasePersistDirectory()
   {
     return basePersistDirectory;
   }
 
-  @Override
   @JsonProperty
   @Deprecated
   public int getMaxPendingPersists()
@@ -229,7 +226,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     return indexSpecForIntermediatePersists;
   }
 
-  @Override
   @JsonProperty
   public boolean isReportParseExceptions()
   {
@@ -248,7 +244,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     return resetOffsetAutomatically;
   }
 
-  @Override
   @JsonProperty
   @Nullable
   public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
@@ -286,21 +281,18 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     return skipSequenceNumberAvailabilityCheck;
   }
 
-  @Override
   @JsonProperty
   public int getNumPersistThreads()
   {
     return numPersistThreads;
   }
 
-  @Override
   @JsonProperty
   public int getMaxColumnsToMerge()
   {
     return maxColumnsToMerge;
   }
 
-  @Override
   public abstract SeekableStreamIndexTaskTuningConfig 
withBasePersistDirectory(File dir);
 
   @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 25aa3c4a089..8f00f7961ff 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -130,6 +131,7 @@ public class TaskToolboxTest
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         mockHandoffNotifierFactory,
         () -> mockQueryRunnerFactoryConglomerate,
+        DruidProcessingConfig::new,
         mockQueryProcessingPool,
         NoopJoinableFactory.INSTANCE,
         () -> mockMonitorScheduler,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 5590ce608ec..e5be345fb9e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.scan.ScanResultValue;
@@ -111,6 +112,7 @@ public class SingleTaskBackgroundRunnerTest
         null,
         null,
         null,
+        DruidProcessingConfig::new,
         null,
         NoopJoinableFactory.INSTANCE,
         null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index e6535fdadc9..75bdd55d395 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -106,6 +106,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
@@ -586,6 +587,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         () -> queryRunnerFactoryConglomerate, // query runner factory 
conglomerate corporation unionized collective
+        DruidProcessingConfig::new,
         DirectQueryProcessingPool.INSTANCE, // query executor service
         NoopJoinableFactory.INSTANCE,
         () -> monitorScheduler, // monitor scheduler
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
index 3f3081b0e69..b56da16de0c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -41,6 +41,7 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
@@ -92,6 +93,7 @@ public class TestTaskToolboxFactory extends TaskToolboxFactory
         bob.serverAnnouncer,
         bob.handoffNotifierFactory,
         bob.queryRunnerFactoryConglomerateProvider,
+        bob.processingConfigProvider,
         bob.queryProcessingPool,
         bob.joinableFactory,
         bob.monitorSchedulerProvider,
@@ -136,6 +138,7 @@ public class TestTaskToolboxFactory extends 
TaskToolboxFactory
     private DataSegmentServerAnnouncer serverAnnouncer;
     private SegmentHandoffNotifierFactory handoffNotifierFactory;
     private Provider<QueryRunnerFactoryConglomerate> 
queryRunnerFactoryConglomerateProvider;
+    private Provider<DruidProcessingConfig> processingConfigProvider;
     private QueryProcessingPool queryProcessingPool;
     private JoinableFactory joinableFactory;
     private Provider<MonitorScheduler> monitorSchedulerProvider;
@@ -236,6 +239,12 @@ public class TestTaskToolboxFactory extends 
TaskToolboxFactory
       return this;
     }
 
+    public Builder setProcessingConfigProvider(Provider<DruidProcessingConfig> 
processingConfigProvider)
+    {
+      this.processingConfigProvider = processingConfigProvider;
+      return this;
+    }
+
     public Builder setQueryProcessingPool(QueryProcessingPool 
queryProcessingPool)
     {
       this.queryProcessingPool = queryProcessingPool;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
new file mode 100644
index 00000000000..2008ee09360
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.query.DruidProcessingBufferConfig;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.AppendableIndexBuilder;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.utils.RuntimeInfo;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class SeekableStreamAppenderatorConfigTest
+{
+  @Test
+  public void test_fromTuningConfig()
+  {
+    final SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+    Mockito.when(tuningConfig.isReportParseExceptions()).thenReturn(true);
+    Mockito.when(tuningConfig.getMaxPendingPersists()).thenReturn(2);
+    
Mockito.when(tuningConfig.isSkipBytesInMemoryOverheadCheck()).thenReturn(true);
+    
Mockito.when(tuningConfig.getIntermediatePersistPeriod()).thenReturn(Period.days(3));
+    Mockito.when(tuningConfig.getBasePersistDirectory()).thenReturn(new 
File("/nonexistent/tmp"));
+    Mockito.when(tuningConfig.getMaxRowsInMemory()).thenReturn(11);
+    Mockito.when(tuningConfig.getMaxBytesInMemory()).thenReturn(12L);
+    Mockito.when(tuningConfig.getIndexSpec())
+           
.thenReturn(IndexSpec.builder().withMetricCompression(CompressionStrategy.NONE).build());
+    Mockito.when(tuningConfig.getIndexSpecForIntermediatePersists())
+           
.thenReturn(IndexSpec.builder().withMetricCompression(CompressionStrategy.NONE).build());
+    Mockito.when(tuningConfig.getNumPersistThreads()).thenReturn(13);
+    Mockito.when(tuningConfig.getMaxRowsPerSegment()).thenReturn(14);
+    Mockito.when(tuningConfig.getMaxTotalRows()).thenReturn(15L);
+    Mockito.when(tuningConfig.getMaxColumnsToMerge()).thenReturn(16);
+    
Mockito.when(tuningConfig.isSkipBytesInMemoryOverheadCheck()).thenReturn(true);
+    Mockito.when(tuningConfig.getMaxColumnsToMerge()).thenReturn(123);
+
+    final SeekableStreamAppenderatorConfig appenderatorConfig =
+        SeekableStreamAppenderatorConfig.fromTuningConfig(tuningConfig, null);
+
+    Assert.assertEquals(tuningConfig.isReportParseExceptions(), 
appenderatorConfig.isReportParseExceptions());
+    Assert.assertEquals(tuningConfig.getMaxPendingPersists(), 
appenderatorConfig.getMaxPendingPersists());
+    Assert.assertEquals(
+        tuningConfig.isSkipBytesInMemoryOverheadCheck(),
+        appenderatorConfig.isSkipBytesInMemoryOverheadCheck()
+    );
+    Assert.assertEquals(tuningConfig.getIntermediatePersistPeriod(), 
appenderatorConfig.getIntermediatePersistPeriod());
+    Assert.assertEquals(tuningConfig.getBasePersistDirectory(), 
appenderatorConfig.getBasePersistDirectory());
+    Assert.assertEquals(tuningConfig.getMaxRowsInMemory(), 
appenderatorConfig.getMaxRowsInMemory());
+    Assert.assertEquals(tuningConfig.getMaxBytesInMemory(), 
appenderatorConfig.getMaxBytesInMemory());
+    Assert.assertEquals(tuningConfig.getIndexSpec(), 
appenderatorConfig.getIndexSpec());
+    Assert.assertEquals(
+        tuningConfig.getIndexSpecForIntermediatePersists(),
+        appenderatorConfig.getIndexSpecForIntermediatePersists()
+    );
+    Assert.assertEquals(tuningConfig.getNumPersistThreads(), 
appenderatorConfig.getNumPersistThreads());
+    Assert.assertEquals(tuningConfig.getMaxRowsPerSegment(), 
appenderatorConfig.getMaxRowsPerSegment());
+    Assert.assertEquals(tuningConfig.getMaxTotalRows(), 
appenderatorConfig.getMaxTotalRows());
+    Assert.assertEquals(tuningConfig.getMaxColumnsToMerge(), 
appenderatorConfig.getMaxColumnsToMerge());
+    Assert.assertEquals(
+        tuningConfig.isSkipBytesInMemoryOverheadCheck(),
+        appenderatorConfig.isSkipBytesInMemoryOverheadCheck()
+    );
+    Assert.assertEquals(tuningConfig.getMaxColumnsToMerge(), 
appenderatorConfig.getMaxColumnsToMerge());
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytesAuto_2proc_1merge()
+  {
+    Assert.assertEquals(
+        17293,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L, 
1_000_000_000L),
+            new MockProcessingConfig(2, 1, 100_000_000),
+            new MockTuningConfig(0, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytesUnlimited_2proc_1merge()
+  {
+    Assert.assertEquals(
+        17293,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L, 
1_000_000_000L),
+            new MockProcessingConfig(2, 1, 100_000_000),
+            new MockTuningConfig(-1, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_direct1800m_xmx1g_maxBytesUnlimited_2proc_1merge()
+  {
+    Assert.assertEquals(
+        15258,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new DruidProcessingConfigTest.MockRuntimeInfo(2, 1_800_000_000L, 
1_000_000_000L),
+            new MockProcessingConfig(2, 1, 100_000_000),
+            new MockTuningConfig(-1, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_direct1800m_xmx1g_maxBytesUnlimited_3proc_2merge()
+  {
+    Assert.assertEquals(
+        13224,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new DruidProcessingConfigTest.MockRuntimeInfo(3, 1_800_000_000L, 
1_000_000_000L),
+            new MockProcessingConfig(3, 2, 100_000_000),
+            new MockTuningConfig(-1, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_direct2g_xmx1g_maxBytes20m_2proc_1merge()
+  {
+    Assert.assertEquals(
+        6666,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new DruidProcessingConfigTest.MockRuntimeInfo(2, 2_000_000_000L, 
1_000_000_000L),
+            new MockProcessingConfig(2, 1, 100_000_000),
+            new MockTuningConfig(20_000_000L, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_calculateDefaultMaxColumnsToMerge_directUnsupported_xmx1g_maxBytes20m_2proc_1merge()
+  {
+    Assert.assertEquals(
+        1017,
+        SeekableStreamAppenderatorConfig.calculateDefaultMaxColumnsToMerge(
+            new RuntimeInfo()
+            {
+              @Override
+              public long getDirectMemorySizeBytes()
+              {
+                throw new UnsupportedOperationException();
+              }
+            },
+            new MockProcessingConfig(2, 1, 100_000_000),
+            new MockTuningConfig(20_000_000L, 500_000_000L)
+        )
+    );
+  }
+
+  @Test
+  public void test_equals()
+  {
+    EqualsVerifier.forClass(SeekableStreamAppenderatorConfig.class)
+                  .usingGetClass()
+                  .verify();
+  }
+
+  private static class MockProcessingConfig extends DruidProcessingConfig
+  {
+    public MockProcessingConfig(final int numThreads, final int 
numMergeBuffers, final int bufferSize)
+    {
+      super(
+          null,
+          numThreads,
+          numMergeBuffers,
+          null,
+          null,
+          new 
DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
+          null
+      );
+    }
+  }
+
+  private static class MockTuningConfig implements TuningConfig
+  {
+    private final long configuredMaxBytesInMemory;
+    private final long defaultMaxBytesInMemory;
+
+    public MockTuningConfig(long configuredMaxBytesInMemory, long 
defaultMaxBytesInMemory)
+    {
+      this.configuredMaxBytesInMemory = configuredMaxBytesInMemory;
+      this.defaultMaxBytesInMemory = defaultMaxBytesInMemory;
+    }
+
+    @Override
+    public AppendableIndexSpec getAppendableIndexSpec()
+    {
+      return new AppendableIndexSpec()
+      {
+        @Override
+        public AppendableIndexBuilder builder()
+        {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getDefaultMaxBytesInMemory()
+        {
+          return defaultMaxBytesInMemory;
+        }
+      };
+    }
+
+    @Override
+    public int getMaxRowsInMemory()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxBytesInMemory()
+    {
+      return configuredMaxBytesInMemory;
+    }
+
+    @Override
+    public PartitionsSpec getPartitionsSpec()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IndexSpec getIndexSpec()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IndexSpec getIndexSpecForIntermediatePersists()
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 9e82196f811..8c0bdf56ed5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -91,6 +91,7 @@ import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
 import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
 import org.apache.druid.query.DirectQueryProcessingPool;
+import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -683,6 +684,7 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
         this::makeQueryRunnerConglomerate,
+        DruidProcessingConfig::new,
         DirectQueryProcessingPool.INSTANCE,
         NoopJoinableFactory.INSTANCE,
         () -> EasyMock.createMock(MonitorScheduler.class),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 37839f8e077..930e2fc7d7c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -143,6 +143,7 @@ public class WorkerTaskManagerTest
                 notifierFactory,
                 null,
                 null,
+                null,
                 NoopJoinableFactory.INSTANCE,
                 null,
                 new SegmentCacheManagerFactory(TestIndex.INDEX_IO, jsonMapper),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index c8a75462535..8e6577a86e8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -185,6 +185,7 @@ public class WorkerTaskMonitorTest
                 notifierFactory,
                 null,
                 null,
+                null,
                 NoopJoinableFactory.INSTANCE,
                 null,
                 new SegmentCacheManagerFactory(TestIndex.INDEX_IO, jsonMapper),
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 658242a6ed7..4afd7afdc0e 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -1204,12 +1204,18 @@ public class IndexMergerV9 implements IndexMerger
     List<List<IndexableAdapter>> currentPhases = getMergePhases(indexes, 
maxColumnsToMerge);
     List<File> currentOutputs = new ArrayList<>();
 
-    log.debug("base outDir: " + outDir);
+    log.debug("Base outDir[%s]", outDir);
 
     try {
       int tierCounter = 0;
       while (true) {
-        log.info("Merging %d phases, tiers finished processed so far: %d.", 
currentPhases.size(), tierCounter);
+        log.info(
+            "Merging phases[%,d] (indexes[%,d], maxColumnsToMerge[%,d]), tiers 
finished processed so far[%,d].",
+            currentPhases.size(),
+            indexes.size(),
+            maxColumnsToMerge,
+            tierCounter
+        );
         for (List<IndexableAdapter> phase : currentPhases) {
           final File phaseOutDir;
           final boolean isFinalPhase = currentPhases.size() == 1;
@@ -1221,8 +1227,8 @@ public class IndexMergerV9 implements IndexMerger
             phaseOutDir = FileUtils.createTempDir();
             tempDirs.add(phaseOutDir);
           }
-          log.info("Merging phase with %d indexes.", phase.size());
-          log.debug("phase outDir: " + phaseOutDir);
+          log.info("Merging phase with index count[%,d].", phase.size());
+          log.debug("Phase outDir[%s]", phaseOutDir);
 
           File phaseOutput = merge(
               phase,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index ab1b09b772c..7e5f1b3f25f 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.realtime.appenderator;
 
+import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
@@ -74,6 +75,6 @@ public interface AppenderatorConfig extends TuningConfig
 
   default int getMaxColumnsToMerge()
   {
-    return -1;
+    return IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to