This is an automated email from the ASF dual-hosted git repository.

agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f3d7a4c07 Emit state of replace and append for native batch tasks 
(#12488)
2f3d7a4c07 is described below

commit 2f3d7a4c076316797b1cef5fc31c4ab5ac67cf59
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Mon May 23 12:32:47 2022 -0700

    Emit state of replace and append for native batch tasks (#12488)
    
    * Emit state of replace and append for native batch tasks
    
    * Emit count of one depending on batch ingestion mode (APPEND, OVERWRITE, 
REPLACE)
    
    * Add metric to compaction job
    
    * Avoid null ptr exc when null emitter
    
    * Coverage
    
    * Emit tombstone & segment counts
    
    * Tasks need a type
    
    * Spelling
    
    * Integrate BatchIngestionMode in batch ingestion tasks functionality
    
    * Typos
    
    * Remove batch ingestion type from metric since it is already in a 
dimension. Move IngestionMode to AbstractTask to facilitate having mode as a 
dimension. Add metrics to streaming. Add missing coverage.
    
    * Avoid inner class referenced by sub-class inspection. Refactor 
computation of IngestionMode to make it more robust to null IOConfig and fix 
test.
    
    * Spelling
    
    * Avoid polluting the Task interface
    
    * Rename computeCompaction methods to avoid ambiguous java compiler error 
if they are passed null. Other minor cleanup.
---
 docs/operations/metrics.md                         |  28 ++++++
 .../common/task/AbstractBatchIndexTask.java        |  27 +++--
 .../druid/indexing/common/task/AbstractTask.java   | 110 ++++++++++++++++++++-
 .../indexing/common/task/CompactionIOConfig.java   |   3 +-
 .../druid/indexing/common/task/CompactionTask.java |  27 ++++-
 .../druid/indexing/common/task/HadoopTask.java     |   2 +-
 .../druid/indexing/common/task/IndexTask.java      |  77 ++++++++++-----
 .../druid/indexing/common/task/IndexTaskUtils.java |   8 ++
 .../task/OverlordCoordinatingSegmentAllocator.java |   9 +-
 .../indexing/common/task/SegmentAllocators.java    |   6 +-
 .../task/batch/parallel/AbstractBatchSubtask.java  |  10 +-
 .../parallel/ParallelIndexSupervisorTask.java      |  48 ++++++---
 .../batch/parallel/PerfectRollupWorkerTask.java    |   2 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |  10 +-
 .../seekablestream/SeekableStreamIndexTask.java    |   5 +-
 .../SeekableStreamIndexTaskRunner.java             |  11 +++
 .../indexing/common/task/AbstractTaskTest.java     |  63 ++++++++++++
 .../indexing/common/task/CompactionTaskTest.java   |  88 +++++++++++------
 .../druid/indexing/common/task/IndexTaskTest.java  |   4 +-
 .../druid/indexing/common/task/TaskSerdeTest.java  |   2 +
 .../ParallelIndexSupervisorTaskResourceTest.java   |   4 +-
 .../parallel/PerfectRollupWorkerTaskTest.java      |   2 +-
 .../druid/indexing/overlord/TaskQueueTest.java     |   2 +-
 .../java/org/apache/druid/query/DruidMetrics.java  |   1 +
 .../client/indexing/ClientCompactionIOConfig.java  |   4 +-
 .../druid/segment/indexing/BatchIOConfig.java      |   7 ++
 .../apache/druid/segment/indexing/IOConfig.java    |   1 -
 .../coordinator/UserCompactionTaskIOConfig.java    |   3 +-
 website/.spelling                                  |   1 +
 29 files changed, 450 insertions(+), 115 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index eb86c91702..59fbb9f908 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -144,6 +144,31 @@ If SQL is enabled, the Broker will emit the following 
metrics for SQL.
 
 ## Ingestion metrics
 
+## General native ingestion metrics
+
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`ingest/count`|Count of `1` every time an ingestion job runs (includes 
compaction jobs). Aggregate using dimensions. |dataSource, taskId, taskType, 
taskIngestionMode|Always `1`.|
+|`ingest/segments/count`|Count of final segments created by job (includes 
tombstones). |dataSource, taskId, taskType, taskIngestionMode|At least `1`.|
+|`ingest/tombstones/count`|Count of tombstones created by job |dataSource, 
taskId, taskType, taskIngestionMode|Zero or more for replace. Always zero for 
non-replace tasks (always zero for legacy replace, see below).|
+
+The `taskIngestionMode` dimension includes the following modes: 
+`APPEND`, `REPLACE_LEGACY`, and `REPLACE`. The `APPEND` mode indicates a native
+ingestion job that is appending to existing segments; `REPLACE` a native 
ingestion
+job replacing existing segments using tombstones; 
+and `REPLACE_LEGACY` the original replace before tombstones.
+
+The mode is decided using the values
+of the `isAppendToExisting` and `isDropExisting` flags in the
+task's `IOConfig` as follows:
+
+|`isAppendToExisting` | `isDropExisting` | mode |
+|---------------------|-------------------|------|
+`true` | `false` | `APPEND`|
+`true` | `true  ` | Invalid combination, exception thrown. |
+`false` | `false` | `REPLACE_LEGACY` (this is the default for native batch 
ingestion). |
+`false` | `true` | `REPLACE`|
+
 ### Ingestion metrics for Kafka
 
 These metrics apply to the [Kafka indexing 
service](../development/extensions-core/kafka-ingestion.md).
@@ -221,6 +246,8 @@ Note: If the JVM does not support CPU time measurement for 
the current thread, i
 |`worker/taskSlot/total/count`|Number of total task slots on the reporting 
worker per emission period. This metric is only available if the 
WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
 |`worker/taskSlot/used/count`|Number of busy task slots on the reporting 
worker per emission period. This metric is only available if the 
WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
 
+
+
 ## Shuffle metrics (Native parallel task)
 
 The shuffle metrics can be enabled by adding 
`org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in 
`druid.monitoring.monitors`
@@ -231,6 +258,7 @@ See [Enabling 
Metrics](../configuration/index.md#enabling-metrics) for more deta
 |`ingest/shuffle/bytes`|Number of bytes shuffled per emission 
period.|supervisorTaskId|Varies|
 |`ingest/shuffle/requests`|Number of shuffle requests per emission 
period.|supervisorTaskId|Varies|
 
+
 ## Coordination
 
 These metrics are for the Druid Coordinator and are reset each time the 
Coordinator runs the coordination logic.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 44de1c93cc..8fcadc301d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -127,9 +127,9 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   // Store lock versions
   Map<Interval, String> intervalToVersion = new HashMap<>();
 
-  protected AbstractBatchIndexTask(String id, String dataSource, Map<String, 
Object> context)
+  protected AbstractBatchIndexTask(String id, String dataSource, Map<String, 
Object> context, IngestionMode ingestionMode)
   {
-    super(id, dataSource, context);
+    super(id, dataSource, context, ingestionMode);
     maxAllowedLockCount = -1;
   }
 
@@ -139,10 +139,11 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
       @Nullable TaskResource taskResource,
       String dataSource,
       @Nullable Map<String, Object> context,
-      int maxAllowedLockCount
+      int maxAllowedLockCount,
+      IngestionMode ingestionMode
   )
   {
-    super(id, groupId, taskResource, dataSource, context);
+    super(id, groupId, taskResource, dataSource, context, ingestionMode);
     this.maxAllowedLockCount = maxAllowedLockCount;
   }
 
@@ -306,11 +307,14 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
         Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
         Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
     );
-    final boolean useSharedLock = ioConfig.isAppendToExisting() && 
getContextValue(Tasks.USE_SHARED_LOCK, false);
+    IngestionMode ingestionMode = getIngestionMode();
+    final boolean useSharedLock = ingestionMode == IngestionMode.APPEND
+                                  && getContextValue(Tasks.USE_SHARED_LOCK, 
false);
     // Respect task context value most.
-    if (forceTimeChunkLock || ioConfig.isDropExisting()) {
-      log.info("forceTimeChunkLock[%s] or isDropExisting[%s] is set to true. 
Use timeChunk lock",
-               forceTimeChunkLock, ioConfig.isDropExisting()
+    if (forceTimeChunkLock || ingestionMode == IngestionMode.REPLACE) {
+      log.info(
+          "forceTimeChunkLock[%s] is set to true or mode[%s] is replace. Use 
timeChunk lock",
+          forceTimeChunkLock, ingestionMode
       );
       taskLockHelper = new TaskLockHelper(false, useSharedLock);
       if (!intervals.isEmpty()) {
@@ -505,10 +509,13 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
    * the start partition ID of the set of perfectly rolled up segments is 0. 
Instead it might need to store an ordinal
    * in addition to the partition ID which represents the ordinal in the 
perfectly rolled up segment set.
    */
-  public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, 
IndexTuningConfig tuningConfig)
+  public static boolean isGuaranteedRollup(
+      IngestionMode ingestionMode,
+      IndexTuningConfig tuningConfig
+  )
   {
     Preconditions.checkArgument(
-        !tuningConfig.isForceGuaranteedRollup() || 
!ioConfig.isAppendToExisting(),
+        !(ingestionMode == IngestionMode.APPEND && 
tuningConfig.isForceGuaranteedRollup()),
         "Perfect rollup cannot be guaranteed when appending to existing 
dataSources"
     );
     return tuningConfig.isForceGuaranteedRollup();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 23adeb6ff8..8afcc09005 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
@@ -28,10 +29,16 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.segment.indexing.BatchIOConfig;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.HashMap;
@@ -40,6 +47,28 @@ import java.util.Map;
 
 public abstract class AbstractTask implements Task
 {
+
+  // This is mainly to avoid using combinations of IOConfig flags to figure 
out the ingestion mode and
+  // also to use the mode as dimension in metrics
+  public enum IngestionMode
+  {
+    REPLACE, // replace with tombstones
+    APPEND, // append to existing segments
+    REPLACE_LEGACY, // original replace, it does not replace existing data for 
empty time chunks in input intervals
+    HADOOP, // non-native batch, hadoop ingestion
+    NONE; // not an ingestion task (i.e. a kill task)
+
+    @JsonCreator
+    public static IngestionMode fromString(String name)
+    {
+      if (name == null) {
+        return null;
+      }
+      return valueOf(StringUtils.toUpperCase(name));
+    }
+  }
+  private final IngestionMode ingestionMode;
+
   @JsonIgnore
   private final String id;
 
@@ -54,9 +83,16 @@ public abstract class AbstractTask implements Task
 
   private final Map<String, Object> context;
 
+  private final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+
+  protected AbstractTask(String id, String dataSource, Map<String, Object> 
context, IngestionMode ingestionMode)
+  {
+    this(id, null, null, dataSource, context, ingestionMode);
+  }
+
   protected AbstractTask(String id, String dataSource, Map<String, Object> 
context)
   {
-    this(id, null, null, dataSource, context);
+    this(id, null, null, dataSource, context, IngestionMode.NONE);
   }
 
   protected AbstractTask(
@@ -64,7 +100,8 @@ public abstract class AbstractTask implements Task
       @Nullable String groupId,
       @Nullable TaskResource taskResource,
       String dataSource,
-      @Nullable Map<String, Object> context
+      @Nullable Map<String, Object> context,
+      @Nonnull IngestionMode ingestionMode
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
@@ -73,6 +110,19 @@ public abstract class AbstractTask implements Task
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     // Copy the given context into a new mutable map because the Druid 
indexing service can add some internal contexts.
     this.context = context == null ? new HashMap<>() : new HashMap<>(context);
+    this.ingestionMode = ingestionMode;
+    IndexTaskUtils.setTaskDimensions(metricBuilder, this);
+  }
+
+  protected AbstractTask(
+      String id,
+      @Nullable String groupId,
+      @Nullable TaskResource taskResource,
+      String dataSource,
+      @Nullable Map<String, Object> context
+  )
+  {
+    this(id, groupId, taskResource, dataSource, context, IngestionMode.NONE);
   }
 
   public static String getOrMakeId(@Nullable String id, final String typeName, 
String dataSource)
@@ -219,4 +269,60 @@ public abstract class AbstractTask implements Task
         Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
     );
   }
+
+  protected ServiceMetricEvent.Builder getMetricBuilder()
+  {
+    return metricBuilder;
+  }
+
+  public IngestionMode getIngestionMode()
+  {
+    return ingestionMode;
+  }
+
+  protected static IngestionMode computeCompactionIngestionMode(@Nullable 
CompactionIOConfig ioConfig)
+  {
+    // CompactionIOConfig does not have an isAppendToExisting method, so use 
default (for batch since compaction
+    // is basically batch ingestion)
+    final boolean isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING;
+    final boolean isDropExisting = ioConfig == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
+    return computeIngestionMode(isAppendToExisting, isDropExisting);
+  }
+
+  protected static IngestionMode computeBatchIngestionMode(@Nullable 
BatchIOConfig ioConfig)
+  {
+    final boolean isAppendToExisting = ioConfig == null
+                                       ? BatchIOConfig.DEFAULT_APPEND_EXISTING
+                                       : ioConfig.isAppendToExisting();
+    final boolean isDropExisting = ioConfig == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
+    return computeIngestionMode(isAppendToExisting, isDropExisting);
+  }
+
+  private static IngestionMode computeIngestionMode(boolean 
isAppendToExisting, boolean isDropExisting)
+  {
+    if (!isAppendToExisting && isDropExisting) {
+      return IngestionMode.REPLACE;
+    } else if (isAppendToExisting && !isDropExisting) {
+      return IngestionMode.APPEND;
+    } else if (!isAppendToExisting) {
+      return IngestionMode.REPLACE_LEGACY;
+    }
+    throw new IAE("Cannot simultaneously replace and append to existing 
segments. "
+                  + "Either dropExisting or appendToExisting should be set to 
false");
+  }
+
+  public void emitMetric(
+      ServiceEmitter emitter,
+      String metric,
+      Number value
+  )
+  {
+
+    if (emitter == null || metric == null || value == null) {
+      return;
+    }
+    emitter.emit(getMetricBuilder().build(metric, value));
+  }
+
+
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
index faedd3da70..3397888006 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.segment.indexing.BatchIOConfig;
 import org.apache.druid.segment.indexing.IOConfig;
 
 import javax.annotation.Nullable;
@@ -46,7 +47,7 @@ public class CompactionIOConfig implements IOConfig
   )
   {
     this.inputSpec = inputSpec;
-    this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : 
dropExisting;
+    this.dropExisting = dropExisting == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 067f75ec7d..c521dbf3b8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -76,6 +76,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.DimensionHandler;
 import org.apache.druid.segment.DimensionHandlerUtils;
@@ -196,7 +197,15 @@ public class CompactionTask extends AbstractBatchIndexTask
       @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
-    super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, 
context, -1);
+    super(
+        getOrMakeId(id, TYPE, dataSource),
+        null,
+        taskResource,
+        dataSource,
+        context,
+        -1,
+        computeCompactionIngestionMode(ioConfig)
+    );
     Checks.checkOneNotNullOrEmpty(
         ImmutableList.of(
             new Property<>("ioConfig", ioConfig),
@@ -422,9 +431,25 @@ public class CompactionTask extends AbstractBatchIndexTask
     return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
   }
 
+  @VisibleForTesting
+  void emitCompactIngestionModeMetrics(
+      ServiceEmitter emitter,
+      boolean isDropExisting
+  )
+  {
+
+    if (emitter == null) {
+      return;
+    }
+    emitMetric(emitter, "ingest/count", 1);
+  }
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
+
+    // emit metric for compact ingestion mode:
+    emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
+
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
index d2022d5f30..39d74537fa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
@@ -65,7 +65,7 @@ public abstract class HadoopTask extends 
AbstractBatchIndexTask
       Map<String, Object> context
   )
   {
-    super(id, dataSource, context);
+    super(id, dataSource, context, IngestionMode.HADOOP);
     this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 6eb05a1b8f..4aced8eb4d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -136,19 +136,22 @@ import java.util.function.Predicate;
 
 public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 {
+
+
+
   public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
 
   private static final Logger log = new Logger(IndexTask.class);
   private static final String TYPE = "index";
 
-  private static String makeGroupId(IndexIngestionSpec ingestionSchema)
+  private static String makeGroupId(IndexIngestionSpec ingestionSchema, 
IngestionMode ingestionMode)
   {
-    return makeGroupId(ingestionSchema.ioConfig.appendToExisting, 
ingestionSchema.dataSchema.getDataSource());
+    return makeGroupId(ingestionSchema.dataSchema.getDataSource(), 
ingestionMode);
   }
 
-  private static String makeGroupId(boolean isAppendToExisting, String 
dataSource)
+  private static String makeGroupId(String dataSource, IngestionMode 
ingestionMode)
   {
-    if (isAppendToExisting) {
+    if (ingestionMode == IngestionMode.APPEND) {
       // Shared locking group for all tasks that append, since they are OK to 
run concurrently.
       return StringUtils.format("%s_append_%s", TYPE, dataSource);
     } else {
@@ -191,7 +194,10 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   {
     this(
         id,
-        makeGroupId(ingestionSchema),
+        makeGroupId(
+            ingestionSchema,
+            computeBatchIngestionMode(ingestionSchema.getIOConfig())
+        ),
         taskResource,
         ingestionSchema.dataSchema.getDataSource(),
         null,
@@ -218,7 +224,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
         resource,
         dataSource,
         context,
-        maxAllowedLockCount
+        maxAllowedLockCount,
+        computeBatchIngestionMode(ingestionSchema.getIOConfig())
     );
     this.baseSequenceName = baseSequenceName == null ? getId() : 
baseSequenceName;
     this.ingestionSchema = ingestionSchema;
@@ -251,8 +258,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @Override
   public boolean requireLockExistingSegments()
   {
-    return isGuaranteedRollup(ingestionSchema.ioConfig, 
ingestionSchema.tuningConfig)
-           || !ingestionSchema.ioConfig.isAppendToExisting();
+    return isGuaranteedRollup(getIngestionMode(), ingestionSchema.tuningConfig)
+           || (getIngestionMode() != IngestionMode.APPEND);
   }
 
   @Override
@@ -270,7 +277,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @Override
   public boolean isPerfectRollup()
   {
-    return isGuaranteedRollup(ingestionSchema.ioConfig, 
ingestionSchema.tuningConfig);
+    return isGuaranteedRollup(getIngestionMode(), 
ingestionSchema.tuningConfig);
   }
 
   @Nullable
@@ -442,6 +449,10 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   public TaskStatus runTask(final TaskToolbox toolbox)
   {
     try {
+
+      // emit metric for sequential batch ingestion mode:
+      emitMetric(toolbox.getEmitter(), "ingest/count", 1);
+
       log.debug("Found chat handler of class[%s]", 
toolbox.getChatHandlerProvider().getClass().getName());
 
       if (toolbox.getChatHandlerProvider().get(getId()).isPresent()) {
@@ -875,7 +886,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
             null,
             dataSchema,
             getTaskLockHelper(),
-            ingestionSchema.getIOConfig().isAppendToExisting(),
+            getIngestionMode(),
             partitionAnalysis.getPartitionsSpec(),
             null
         );
@@ -941,10 +952,13 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
           );
 
       Set<DataSegment> tombStones = Collections.emptySet();
-      if (ingestionSchema.getIOConfig().isDropExisting()) {
-        TombstoneHelper tombstoneHelper = new 
TombstoneHelper(pushed.getSegments(),
-                                                              
ingestionSchema.getDataSchema(),
-                                                              
toolbox.getTaskActionClient());
+      if (getIngestionMode() == IngestionMode.REPLACE) {
+        // check whether to generate tombstones...
+        TombstoneHelper tombstoneHelper = new TombstoneHelper(
+            pushed.getSegments(),
+            ingestionSchema.getDataSchema(),
+            toolbox.getTaskActionClient()
+        );
 
         List<Interval> tombstoneIntervals = 
tombstoneHelper.computeTombstoneIntervals();
         // now find the versions for the tombstone intervals
@@ -957,7 +971,10 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
           );
           tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
         }
+
         tombStones = tombstoneHelper.computeTombstones(tombstonesAndVersions);
+
+
         log.debugSegments(tombStones, "To publish tombstones");
       }
 
@@ -1001,6 +1018,14 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
             buildSegmentsMeters.getThrownAway()
         );
         log.info("Published [%s] segments", published.getSegments().size());
+
+        // publish metrics:
+        emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", 
tombStones.size());
+        // segments count metric is documented to include tombstones
+        emitMetric(toolbox.getEmitter(), "ingest/segments/count",
+                   published.getSegments().size() + tombStones.size()
+        );
+
         log.debugSegments(published.getSegments(), "Published segments");
 
         toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
@@ -1060,8 +1085,12 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
         throw new IAE("Cannot use parser and inputSource together. Try using 
inputFormat instead of parser.");
       }
 
-      if (ioConfig.isDropExisting() && 
dataSchema.getGranularitySpec().inputIntervals().isEmpty()) {
-        throw new IAE("GranularitySpec's intervals cannot be empty when 
setting dropExisting to true.");
+      IngestionMode ingestionMode = 
AbstractTask.computeBatchIngestionMode(ioConfig);
+
+      if (ingestionMode == IngestionMode.REPLACE && 
dataSchema.getGranularitySpec()
+                                                              .inputIntervals()
+                                                              .isEmpty()) {
+        throw new IAE("GranularitySpec's intervals cannot be empty for 
replace.");
       }
 
       if (ioConfig.getInputSource() != null && 
ioConfig.getInputSource().needsFormat()) {
@@ -1103,13 +1132,12 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @JsonTypeName("index")
   public static class IndexIOConfig implements BatchIOConfig
   {
-    private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
 
     private final FirehoseFactory firehoseFactory;
     private final InputSource inputSource;
     private final InputFormat inputFormat;
-    private final boolean appendToExisting;
-    private final boolean dropExisting;
+    private boolean appendToExisting;
+    private boolean dropExisting;
 
     @JsonCreator
     public IndexIOConfig(
@@ -1129,12 +1157,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       this.firehoseFactory = firehoseFactory;
       this.inputSource = inputSource;
       this.inputFormat = inputFormat;
-      this.appendToExisting = appendToExisting == null ? 
DEFAULT_APPEND_TO_EXISTING : appendToExisting;
-      this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : 
dropExisting;
-      if (this.dropExisting && this.appendToExisting) {
-        throw new IAE("Cannot both drop existing segments and append to 
existing segments. "
-                      + "Either dropExisting or appendToExisting should be set 
to false");
-      }
+      this.appendToExisting = appendToExisting == null ? 
BatchIOConfig.DEFAULT_APPEND_EXISTING : appendToExisting;
+      this.dropExisting = dropExisting == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
     }
 
     // old constructor for backward compatibility
@@ -1191,14 +1215,12 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     }
 
     @Override
-    @JsonProperty
     public boolean isAppendToExisting()
     {
       return appendToExisting;
     }
 
     @Override
-    @JsonProperty
     public boolean isDropExisting()
     {
       return dropExisting;
@@ -1731,4 +1753,5 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
              '}';
     }
   }
+
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 002b044399..44b838f69a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -114,6 +114,14 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
   }
 
+  public static void setTaskDimensions(final ServiceMetricEvent.Builder 
metricBuilder, final AbstractTask task)
+  {
+    metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
+    metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
+    metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
+    metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, 
((AbstractTask) task).getIngestionMode());
+  }
+
   public static void setTaskStatusDimensions(
       final ServiceMetricEvent.Builder metricBuilder,
       final TaskStatus taskStatus
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index d2022589b1..bd9c3c8b45 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -55,7 +55,7 @@ public class OverlordCoordinatingSegmentAllocator implements 
SegmentAllocatorFor
       final @Nullable SupervisorTaskAccess supervisorTaskAccess,
       final DataSchema dataSchema,
       final TaskLockHelper taskLockHelper,
-      final boolean appendToExisting,
+      final AbstractTask.IngestionMode ingestionMode,
       final PartitionsSpec partitionsSpec
   )
   {
@@ -72,7 +72,7 @@ public class OverlordCoordinatingSegmentAllocator implements 
SegmentAllocatorFor
               .bucketInterval(row.getTimestamp())
               
.or(granularitySpec.getSegmentGranularity().bucket(row.getTimestamp()));
           final PartialShardSpec partialShardSpec = createPartialShardSpec(
-              appendToExisting,
+              ingestionMode,
               partitionsSpec,
               taskLockHelper,
               interval
@@ -107,7 +107,7 @@ public class OverlordCoordinatingSegmentAllocator 
implements SegmentAllocatorFor
   }
 
   private static PartialShardSpec createPartialShardSpec(
-      boolean appendToExisting,
+      AbstractTask.IngestionMode ingestionMode,
       PartitionsSpec partitionsSpec,
       TaskLockHelper taskLockHelper,
       Interval interval
@@ -115,7 +115,8 @@ public class OverlordCoordinatingSegmentAllocator 
implements SegmentAllocatorFor
   {
     if (partitionsSpec.getType() == SecondaryPartitionType.LINEAR) {
       if (taskLockHelper.isUseSegmentLock()) {
-        if (taskLockHelper.hasOverwritingRootGenerationPartition(interval) && 
!appendToExisting) {
+        if (taskLockHelper.hasOverwritingRootGenerationPartition(interval) && 
(ingestionMode
+                                                                               
!= AbstractTask.IngestionMode.APPEND)) {
           final OverwritingRootGenerationPartitions 
overwritingRootGenerationPartitions = taskLockHelper
               .getOverwritingRootGenerationPartition(interval);
           if (overwritingRootGenerationPartitions == null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
index 7bb72ca332..1e0216d810 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
@@ -43,19 +43,19 @@ public final class SegmentAllocators
       final @Nullable SupervisorTaskAccess supervisorTaskAccess,
       final DataSchema dataSchema,
       final TaskLockHelper taskLockHelper,
-      final boolean appendToExisting,
+      final AbstractTask.IngestionMode ingestionMode,
       final PartitionsSpec partitionsSpec,
       final @Nullable Boolean useLineageBasedSegmentAllocation
   ) throws IOException
   {
-    if (appendToExisting || taskLockHelper.isUseSegmentLock()) {
+    if (ingestionMode == AbstractTask.IngestionMode.APPEND || 
taskLockHelper.isUseSegmentLock()) {
       return new OverlordCoordinatingSegmentAllocator(
           toolbox,
           sequenceName,
           supervisorTaskAccess,
           dataSchema,
           taskLockHelper,
-          appendToExisting,
+          ingestionMode,
           partitionsSpec
       );
     } else {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
index 9b721ada5b..37b70c53ed 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
@@ -22,25 +22,23 @@ package 
org.apache.druid.indexing.common.task.batch.parallel;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.TaskResource;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Map;
 
 public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
 {
-  protected AbstractBatchSubtask(String id, String dataSource, Map<String, 
Object> context)
-  {
-    super(id, dataSource, context);
-  }
 
   protected AbstractBatchSubtask(
       String id,
       @Nullable String groupId,
       @Nullable TaskResource taskResource,
       String dataSource,
-      @Nullable Map<String, Object> context
+      @Nullable Map<String, Object> context,
+      @Nonnull IngestionMode ingestionMode
   )
   {
-    super(id, groupId, taskResource, dataSource, context, -1);
+    super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 02425ff91d..058b869dda 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -193,6 +193,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private IngestionState ingestionState;
 
+
   @JsonCreator
   public ParallelIndexSupervisorTask(
       @JsonProperty("id") String id,
@@ -220,25 +221,26 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         taskResource,
         ingestionSchema.getDataSchema().getDataSource(),
         context,
-        ingestionSchema.getTuningConfig().getMaxAllowedLockCount()
+        ingestionSchema.getTuningConfig().getMaxAllowedLockCount(),
+        computeBatchIngestionMode(ingestionSchema.getIOConfig())
     );
 
     this.ingestionSchema = ingestionSchema;
     this.baseSubtaskSpecName = baseSubtaskSpecName == null ? getId() : 
baseSubtaskSpecName;
-
-    if (ingestionSchema.getIOConfig().isDropExisting() &&
+    if (getIngestionMode() == IngestionMode.REPLACE &&
         
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
-      throw new ISE("GranularitySpec's intervals cannot be empty when setting 
dropExisting to true.");
+      throw new ISE("GranularitySpec's intervals cannot be empty when using 
replace.");
     }
 
-    if (isGuaranteedRollup(ingestionSchema.getIOConfig(), 
ingestionSchema.getTuningConfig())) {
+    if (isGuaranteedRollup(getIngestionMode(), 
ingestionSchema.getTuningConfig())) {
       
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
     }
 
     this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
         ingestionSchema.getDataSchema().getParser()
     );
-    this.missingIntervalsInOverwriteMode = 
!ingestionSchema.getIOConfig().isAppendToExisting()
+    this.missingIntervalsInOverwriteMode = (getIngestionMode()
+                                            != IngestionMode.APPEND)
                                            && ingestionSchema.getDataSchema()
                                                              
.getGranularitySpec()
                                                              .inputIntervals()
@@ -427,13 +429,16 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Override
   public boolean requireLockExistingSegments()
   {
-    return !ingestionSchema.getIOConfig().isAppendToExisting();
+    return getIngestionMode() != IngestionMode.APPEND;
   }
 
   @Override
   public boolean isPerfectRollup()
   {
-    return isGuaranteedRollup(getIngestionSchema().getIOConfig(), 
getIngestionSchema().getTuningConfig());
+    return isGuaranteedRollup(
+        getIngestionMode(),
+        getIngestionSchema().getTuningConfig()
+    );
   }
 
   @Nullable
@@ -451,6 +456,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
+
     if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
         != TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
       LOG.warn("maxSavedParseExceptions is not supported yet");
@@ -489,8 +495,14 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       initializeSubTaskCleaner();
 
       if (isParallelMode()) {
+        // emit metric for parallel batch ingestion mode:
+        emitMetric(toolbox.getEmitter(), "ingest/count", 1);
+
         this.toolbox = toolbox;
-        if (isGuaranteedRollup(ingestionSchema.getIOConfig(), 
ingestionSchema.getTuningConfig())) {
+        if (isGuaranteedRollup(
+            getIngestionMode(),
+            ingestionSchema.getTuningConfig()
+        )) {
           return runMultiPhaseParallel(toolbox);
         } else {
           return runSinglePhaseParallel(toolbox);
@@ -1108,8 +1120,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         ingestionSchema
     );
 
-    Set<DataSegment> tombStones;
-    if (ingestionSchema.getIOConfig().isDropExisting()) {
+
+    Set<DataSegment> tombStones = Collections.emptySet();
+    if (getIngestionMode() == IngestionMode.REPLACE) {
       TombstoneHelper tombstoneHelper = new TombstoneHelper(
           newSegments,
           ingestionSchema.getDataSchema(),
@@ -1127,8 +1140,11 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
           );
           tombstonesAnShards.put(interval, segmentIdWithShardSpec);
         }
+
         tombStones = tombstoneHelper.computeTombstones(tombstonesAnShards);
+        // add tombstones
         newSegments.addAll(tombStones);
+
         LOG.debugSegments(tombStones, "To publish tombstones");
       }
     }
@@ -1146,6 +1162,11 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
     if (published) {
       LOG.info("Published [%d] segments", newSegments.size());
+
+      // segment metrics:
+      emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", 
tombStones.size());
+      emitMetric(toolbox.getEmitter(), "ingest/segments/count", 
newSegments.size());
+
     } else {
       throw new ISE("Failed to publish segments");
     }
@@ -1675,7 +1696,10 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     }
 
     if (isParallelMode()) {
-      if (isGuaranteedRollup(ingestionSchema.getIOConfig(), 
ingestionSchema.getTuningConfig())) {
+      if (isGuaranteedRollup(
+          getIngestionMode(),
+          ingestionSchema.getTuningConfig()
+      )) {
         return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
             (ParallelIndexTaskRunner<?, ?>) currentRunner,
             includeUnparseable
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 35e00b4136..4259922b43 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -52,7 +52,7 @@ abstract class PerfectRollupWorkerTask extends 
AbstractBatchSubtask
       @Nullable Map<String, Object> context
   )
   {
-    super(id, groupId, taskResource, dataSchema.getDataSource(), context);
+    super(id, groupId, taskResource, dataSchema.getDataSource(), context, 
IngestionMode.NONE);
 
     Preconditions.checkArgument(
         tuningConfig.isForceGuaranteedRollup(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2970c509b7..0d47ea02d0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.IndexTask;
@@ -161,7 +162,8 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
         groupId,
         taskResource,
         ingestionSchema.getDataSchema().getDataSource(),
-        context
+        context,
+        AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig())
     );
 
     if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
@@ -172,7 +174,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
     this.supervisorTaskId = supervisorTaskId;
-    this.missingIntervalsInOverwriteMode = 
!ingestionSchema.getIOConfig().isAppendToExisting()
+    this.missingIntervalsInOverwriteMode = 
ingestionSchema.getIOConfig().isAppendToExisting() != true
                                            && ingestionSchema.getDataSchema()
                                                              
.getGranularitySpec()
                                                              .inputIntervals()
@@ -298,7 +300,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   @Override
   public boolean requireLockExistingSegments()
   {
-    return !ingestionSchema.getIOConfig().isAppendToExisting();
+    return getIngestionMode() != IngestionMode.APPEND;
   }
 
   @Override
@@ -388,7 +390,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
         new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
         getIngestionSchema().getDataSchema(),
         getTaskLockHelper(),
-        ingestionSchema.getIOConfig().isAppendToExisting(),
+        getIngestionMode(),
         partitionsSpec,
         useLineageBasedSegmentAllocation
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index b74569f7f3..f8502bac26 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -95,7 +95,8 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
         groupId,
         taskResource,
         dataSchema.getDataSource(),
-        context
+        context,
+        IngestionMode.APPEND
     );
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
@@ -146,6 +147,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   @Override
   public TaskStatus run(final TaskToolbox toolbox)
   {
+    emitMetric(toolbox.getEmitter(), "ingest/count", 1);
     return getRunner().run(toolbox);
   }
 
@@ -261,7 +263,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
         );
       }
     }
-
     return !beforeMinimumMessageTime && !afterMaximumMessageTime;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index dce3b09aa7..6d55f841a5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1025,6 +1025,17 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                   }
                 }
             );
+            // emit segment count metric:
+            int segmentCount = 0;
+            if (publishedSegmentsAndCommitMetadata != null
+                && publishedSegmentsAndCommitMetadata.getSegments() != null) {
+              segmentCount = 
publishedSegmentsAndCommitMetadata.getSegments().size();
+            }
+            task.emitMetric(
+                toolbox.getEmitter(),
+                "ingest/segment/count",
+                segmentCount
+            );
           }
 
           @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
new file mode 100644
index 0000000000..dc3202c4ff
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AbstractTaskTest
+{
+
+  @Test
+  public void testBatchIOConfigAppend()
+  {
+    AbstractTask.IngestionMode ingestionMode = 
AbstractTask.IngestionMode.fromString("APPEND");
+    Assert.assertEquals(AbstractTask.IngestionMode.APPEND, ingestionMode);
+  }
+
+  @Test
+  public void testBatchIOConfigReplace()
+  {
+    AbstractTask.IngestionMode ingestionMode = 
AbstractTask.IngestionMode.fromString("REPLACE");
+    Assert.assertEquals(AbstractTask.IngestionMode.REPLACE, ingestionMode);
+  }
+
+  @Test
+  public void testBatchIOConfigOverwrite()
+  {
+    AbstractTask.IngestionMode ingestionMode = 
AbstractTask.IngestionMode.fromString("REPLACE_LEGACY");
+    Assert.assertEquals(AbstractTask.IngestionMode.REPLACE_LEGACY, 
ingestionMode);
+  }
+
+  @Test
+  public void testBatchIOConfigHadoop()
+  {
+    AbstractTask.IngestionMode ingestionMode = 
AbstractTask.IngestionMode.fromString("HADOOP");
+    Assert.assertEquals(AbstractTask.IngestionMode.HADOOP, ingestionMode);
+  }
+
+  @Test
+  public void testBatchIOConfigNone()
+  {
+    AbstractTask.IngestionMode ingestionMode = 
AbstractTask.IngestionMode.fromString("NONE");
+    Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode);
+  }
+
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index a0f85232f1..b88cf10a3a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -82,6 +82,8 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -112,8 +114,8 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.BatchIOConfig;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.IOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -399,6 +401,27 @@ public class CompactionTaskTest
     );
   }
 
+  @Test
+  public void testCompactionTaskEmitter()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.tuningConfig(createTuningConfig());
+    builder.segmentGranularity(Granularities.HOUR);
+    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+
+    // null emitter should work
+    taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(null, 
false);
+    // non-null should also work
+    ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new 
NoopEmitter());
+    
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, 
false);
+    
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, 
true);
+  }
+
   @Test(expected = IAE.class)
   public void 
testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
   {
@@ -898,7 +921,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -916,7 +939,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -972,7 +995,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -991,7 +1014,7 @@ public class CompactionTaskTest
         tuningConfig,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1047,7 +1070,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1066,7 +1089,7 @@ public class CompactionTaskTest
         tuningConfig,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1122,7 +1145,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1141,7 +1164,7 @@ public class CompactionTaskTest
         tuningConfig,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1187,7 +1210,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
 
     ingestionSpecs.sort(
@@ -1206,7 +1229,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1232,7 +1255,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
 
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
@@ -1251,7 +1274,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1270,7 +1293,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1288,7 +1311,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1314,7 +1337,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1339,7 +1362,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1376,7 +1399,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1396,7 +1419,7 @@ public class CompactionTaskTest
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1415,7 +1438,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1433,7 +1456,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         new PeriodGranularity(Period.months(3), null, null),
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1456,7 +1479,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1476,7 +1499,7 @@ public class CompactionTaskTest
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
         new PeriodGranularity(Period.months(3), null, null),
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1495,7 +1518,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1513,7 +1536,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1533,7 +1556,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1551,7 +1574,7 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         Granularities.MONTH,
         Granularities.NONE,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1571,7 +1594,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
 
     Assert.assertEquals(6, ingestionSpecs.size());
@@ -1596,7 +1619,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
-        IOConfig.DEFAULT_DROP_EXISTING
+        BatchIOConfig.DEFAULT_DROP_EXISTING
     );
     Assert.assertEquals(6, ingestionSpecs.size());
     for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
@@ -1913,7 +1936,10 @@ public class CompactionTaskTest
       // assert ioConfig
       final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
       Assert.assertFalse(ioConfig.isAppendToExisting());
-      Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
+      Assert.assertEquals(
+          expectedDropExisting,
+          ioConfig.isDropExisting()
+      );
       final InputSource inputSource = ioConfig.getInputSource();
       Assert.assertTrue(inputSource instanceof DruidInputSource);
       final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
@@ -2245,7 +2271,7 @@ public class CompactionTaskTest
         @JacksonInject AppenderatorsManager appenderatorsManager
     )
     {
-      super(getOrMakeId(id, "compact", dataSource), null, taskResource, 
dataSource, context);
+      super(getOrMakeId(id, "compact", dataSource), null, taskResource, 
dataSource, context, IngestionMode.REPLACE_LEGACY);
       this.interval = interval;
       this.segments = segments;
       this.dimensionsSpec = dimensionsSpec;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 856de0c105..babc3ff1fd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -858,7 +858,7 @@ public class IndexTaskTest extends IngestionTestBase
     // Expect exception if reingest with dropExisting and null intervals is 
attempted
     expectedException.expect(IAE.class);
     expectedException.expectMessage(
-        "GranularitySpec's intervals cannot be empty when setting dropExisting 
to true."
+        "GranularitySpec's intervals cannot be empty for replace."
     );
     IndexTask indexTask = new IndexTask(
         null,
@@ -2700,7 +2700,7 @@ public class IndexTaskTest extends IngestionTestBase
   {
     expectedException.expect(IAE.class);
     expectedException.expectMessage(
-        "Cannot both drop existing segments and append to existing segments. 
Either dropExisting or appendToExisting should be set to false"
+        "Cannot simultaneously replace and append to existing segments. Either 
dropExisting or appendToExisting should be set to false"
     );
     new IndexTask(
         null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 20c7eb4fc2..499ba22bb1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -94,6 +94,7 @@ public class TaskSerdeTest
     );
 
     Assert.assertEquals(false, ioConfig.isAppendToExisting());
+    Assert.assertEquals(false, ioConfig.isDropExisting());
   }
 
   @Test
@@ -293,6 +294,7 @@ public class TaskSerdeTest
     Assert.assertTrue(taskIoConfig.getInputSource() instanceof 
LocalInputSource);
     Assert.assertTrue(task2IoConfig.getInputSource() instanceof 
LocalInputSource);
     Assert.assertEquals(taskIoConfig.isAppendToExisting(), 
task2IoConfig.isAppendToExisting());
+    Assert.assertEquals(taskIoConfig.isDropExisting(), 
task2IoConfig.isDropExisting());
 
     IndexTask.IndexTuningConfig taskTuningConfig = 
task.getIngestionSchema().getTuningConfig();
     IndexTask.IndexTuningConfig task2TuningConfig = 
task2.getIngestionSchema().getTuningConfig();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 1b6e858c29..a187e9df8d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -559,7 +559,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
                   baseInputSource.withSplit(split),
                   getIngestionSchema().getIOConfig().getInputFormat(),
                   getIngestionSchema().getIOConfig().isAppendToExisting(),
-                  null
+                  getIngestionSchema().getIOConfig().isDropExisting()
               ),
               getIngestionSchema().getTuningConfig()
           ),
@@ -684,7 +684,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
           new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
           getIngestionSchema().getDataSchema(),
           getTaskLockHelper(),
-          getIngestionSchema().getIOConfig().isAppendToExisting(),
+          
AbstractTask.computeBatchIngestionMode(getIngestionSchema().getIOConfig()),
           partitionsSpec,
           true
       );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index c22c25531d..71b474e773 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -164,7 +164,7 @@ public class PerfectRollupWorkerTaskTest
     @Override
     public String getType()
     {
-      throw new UnsupportedOperationException();
+      return "TestPerfectRollupWorkerTask";
     }
 
     @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index dc5fc0e1c6..a90ad8b3c0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -362,7 +362,7 @@ public class TaskQueueTest extends IngestionTestBase
 
     private TestTask(String id, Interval interval, Map<String, Object> context)
     {
-      super(id, "datasource", context);
+      super(id, "datasource", context, IngestionMode.NONE);
       this.interval = interval;
     }
 
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index bb3519d84e..5e6bcb2d5f 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -34,6 +34,7 @@ public class DruidMetrics
   public static final String ID = "id";
   public static final String TASK_ID = "taskId";
   public static final String STATUS = "status";
+  public static final String TASK_INGESTION_MODE = "taskIngestionMode";
 
   public static final String PARTITIONING_TYPE = "partitioningType";
 
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index 0419bc2b5f..348d9d22da 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,7 +21,7 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.indexing.IOConfig;
+import org.apache.druid.segment.indexing.BatchIOConfig;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -45,7 +45,7 @@ public class ClientCompactionIOConfig
   )
   {
     this.inputSpec = inputSpec;
-    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING 
: dropExisting;
+    this.dropExisting = dropExisting == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java 
b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
index b05b38145a..946f1bca61 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.indexing;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
 
@@ -27,11 +28,17 @@ import org.apache.druid.data.input.InputSource;
  */
 public interface BatchIOConfig extends IOConfig
 {
+  boolean DEFAULT_DROP_EXISTING = false;
+  boolean DEFAULT_APPEND_EXISTING = false;
+
   InputSource getInputSource();
 
   InputFormat getInputFormat();
 
+  @JsonProperty
   boolean isAppendToExisting();
 
+  @JsonProperty
   boolean isDropExisting();
 }
+
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java 
b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
index 58f84c2bc4..b1784806e7 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
@@ -30,5 +30,4 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 })
 public interface IOConfig
 {
-  boolean DEFAULT_DROP_EXISTING = false;
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
index df5af287ac..a57104683b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.indexing.BatchIOConfig;
 import org.apache.druid.segment.indexing.IOConfig;
 
 import javax.annotation.Nullable;
@@ -43,7 +44,7 @@ public class UserCompactionTaskIOConfig
       @JsonProperty("dropExisting") @Nullable Boolean dropExisting
   )
   {
-    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING 
: dropExisting;
+    this.dropExisting = dropExisting == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
diff --git a/website/.spelling b/website/.spelling
index fd16c89754..44b7fc3349 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1468,6 +1468,7 @@ poolName
 remoteAddress
 segmentAvailabilityConfirmed
 serviceName
+taskIngestionMode
 taskStatus
 taskType
 threadPoolNumBusyThreads.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to