This is an automated email from the ASF dual-hosted git repository.
agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2f3d7a4c07 Emit state of replace and append for native batch tasks
(#12488)
2f3d7a4c07 is described below
commit 2f3d7a4c076316797b1cef5fc31c4ab5ac67cf59
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Mon May 23 12:32:47 2022 -0700
Emit state of replace and append for native batch tasks (#12488)
* Emit state of replace and append for native batch tasks
* Emit count of one depending on batch ingestion mode (APPEND, OVERWRITE,
REPLACE)
* Add metric to compaction job
* Avoid null ptr exc when null emitter
* Coverage
* Emit tombstone & segment counts
* Tasks need a type
* Spelling
* Integrate BatchIngestionMode in batch ingestion tasks functionality
* Typos
* Remove batch ingestion type from metric since it is already in a
dimension. Move IngestionMode to AbstractTask to facilitate having mode as a
dimension. Add metrics to streaming. Add missing coverage.
* Avoid inner class referenced by sub-class inspection. Refactor
computation of IngestionMode to make it more robust to null IOConfig and fix
test.
* Spelling
* Avoid polluting the Task interface
* Rename computeCompaction methods to avoid ambiguous java compiler error
if they are passed null. Other minor cleanup.
---
docs/operations/metrics.md | 28 ++++++
.../common/task/AbstractBatchIndexTask.java | 27 +++--
.../druid/indexing/common/task/AbstractTask.java | 110 ++++++++++++++++++++-
.../indexing/common/task/CompactionIOConfig.java | 3 +-
.../druid/indexing/common/task/CompactionTask.java | 27 ++++-
.../druid/indexing/common/task/HadoopTask.java | 2 +-
.../druid/indexing/common/task/IndexTask.java | 77 ++++++++++-----
.../druid/indexing/common/task/IndexTaskUtils.java | 8 ++
.../task/OverlordCoordinatingSegmentAllocator.java | 9 +-
.../indexing/common/task/SegmentAllocators.java | 6 +-
.../task/batch/parallel/AbstractBatchSubtask.java | 10 +-
.../parallel/ParallelIndexSupervisorTask.java | 48 ++++++---
.../batch/parallel/PerfectRollupWorkerTask.java | 2 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 10 +-
.../seekablestream/SeekableStreamIndexTask.java | 5 +-
.../SeekableStreamIndexTaskRunner.java | 11 +++
.../indexing/common/task/AbstractTaskTest.java | 63 ++++++++++++
.../indexing/common/task/CompactionTaskTest.java | 88 +++++++++++------
.../druid/indexing/common/task/IndexTaskTest.java | 4 +-
.../druid/indexing/common/task/TaskSerdeTest.java | 2 +
.../ParallelIndexSupervisorTaskResourceTest.java | 4 +-
.../parallel/PerfectRollupWorkerTaskTest.java | 2 +-
.../druid/indexing/overlord/TaskQueueTest.java | 2 +-
.../java/org/apache/druid/query/DruidMetrics.java | 1 +
.../client/indexing/ClientCompactionIOConfig.java | 4 +-
.../druid/segment/indexing/BatchIOConfig.java | 7 ++
.../apache/druid/segment/indexing/IOConfig.java | 1 -
.../coordinator/UserCompactionTaskIOConfig.java | 3 +-
website/.spelling | 1 +
29 files changed, 450 insertions(+), 115 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index eb86c91702..59fbb9f908 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -144,6 +144,31 @@ If SQL is enabled, the Broker will emit the following
metrics for SQL.
## Ingestion metrics
+## General native ingestion metrics
+
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`ingest/count`|Count of `1` every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions. |dataSource, taskId, taskType,
taskIngestionMode|Always `1`.|
+|`ingest/segments/count`|Count of final segments created by job (includes
tombstones). |dataSource, taskId, taskType, taskIngestionMode|At least `1`.|
+|`ingest/tombstones/count`|Count of tombstones created by job |dataSource,
taskId, taskType, taskIngestionMode|Zero or more for replace. Always zero for
non-replace tasks (always zero for legacy replace, see below).|
+
+The `taskIngestionMode` dimension includes the following modes:
+`APPEND`, `REPLACE_LEGACY`, and `REPLACE`. The `APPEND` mode indicates a native
+ingestion job that is appending to existing segments; `REPLACE` a native
ingestion
+job replacing existing segments using tombstones;
+and `REPLACE_LEGACY` the original replace before tombstones.
+
+The mode is decided using the values
+of the `isAppendToExisting` and `isDropExisting` flags in the
+task's `IOConfig` as follows:
+
+|`isAppendToExisting` | `isDropExisting` | mode |
+|---------------------|-------------------|------|
+`true` | `false` | `APPEND`|
+`true` | `true ` | Invalid combination, exception thrown. |
+`false` | `false` | `REPLACE_LEGACY` (this is the default for native batch
ingestion). |
+`false` | `true` | `REPLACE`|
+
### Ingestion metrics for Kafka
These metrics apply to the [Kafka indexing
service](../development/extensions-core/kafka-ingestion.md).
@@ -221,6 +246,8 @@ Note: If the JVM does not support CPU time measurement for
the current thread, i
|`worker/taskSlot/total/count`|Number of total task slots on the reporting
worker per emission period. This metric is only available if the
WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting
worker per emission period. This metric is only available if the
WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
+
+
## Shuffle metrics (Native parallel task)
The shuffle metrics can be enabled by adding
`org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in
`druid.monitoring.monitors`
@@ -231,6 +258,7 @@ See [Enabling
Metrics](../configuration/index.md#enabling-metrics) for more deta
|`ingest/shuffle/bytes`|Number of bytes shuffled per emission
period.|supervisorTaskId|Varies|
|`ingest/shuffle/requests`|Number of shuffle requests per emission
period.|supervisorTaskId|Varies|
+
## Coordination
These metrics are for the Druid Coordinator and are reset each time the
Coordinator runs the coordination logic.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 44de1c93cc..8fcadc301d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -127,9 +127,9 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
// Store lock versions
Map<Interval, String> intervalToVersion = new HashMap<>();
- protected AbstractBatchIndexTask(String id, String dataSource, Map<String,
Object> context)
+ protected AbstractBatchIndexTask(String id, String dataSource, Map<String,
Object> context, IngestionMode ingestionMode)
{
- super(id, dataSource, context);
+ super(id, dataSource, context, ingestionMode);
maxAllowedLockCount = -1;
}
@@ -139,10 +139,11 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context,
- int maxAllowedLockCount
+ int maxAllowedLockCount,
+ IngestionMode ingestionMode
)
{
- super(id, groupId, taskResource, dataSource, context);
+ super(id, groupId, taskResource, dataSource, context, ingestionMode);
this.maxAllowedLockCount = maxAllowedLockCount;
}
@@ -306,11 +307,14 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
- final boolean useSharedLock = ioConfig.isAppendToExisting() &&
getContextValue(Tasks.USE_SHARED_LOCK, false);
+ IngestionMode ingestionMode = getIngestionMode();
+ final boolean useSharedLock = ingestionMode == IngestionMode.APPEND
+ && getContextValue(Tasks.USE_SHARED_LOCK,
false);
// Respect task context value most.
- if (forceTimeChunkLock || ioConfig.isDropExisting()) {
- log.info("forceTimeChunkLock[%s] or isDropExisting[%s] is set to true.
Use timeChunk lock",
- forceTimeChunkLock, ioConfig.isDropExisting()
+ if (forceTimeChunkLock || ingestionMode == IngestionMode.REPLACE) {
+ log.info(
+ "forceTimeChunkLock[%s] is set to true or mode[%s] is replace. Use
timeChunk lock",
+ forceTimeChunkLock, ingestionMode
);
taskLockHelper = new TaskLockHelper(false, useSharedLock);
if (!intervals.isEmpty()) {
@@ -505,10 +509,13 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
* the start partition ID of the set of perfectly rolled up segments is 0.
Instead it might need to store an ordinal
* in addition to the partition ID which represents the ordinal in the
perfectly rolled up segment set.
*/
- public static boolean isGuaranteedRollup(IndexIOConfig ioConfig,
IndexTuningConfig tuningConfig)
+ public static boolean isGuaranteedRollup(
+ IngestionMode ingestionMode,
+ IndexTuningConfig tuningConfig
+ )
{
Preconditions.checkArgument(
- !tuningConfig.isForceGuaranteedRollup() ||
!ioConfig.isAppendToExisting(),
+ !(ingestionMode == IngestionMode.APPEND &&
tuningConfig.isForceGuaranteedRollup()),
"Perfect rollup cannot be guaranteed when appending to existing
dataSources"
);
return tuningConfig.isForceGuaranteedRollup();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 23adeb6ff8..8afcc09005 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
@@ -28,10 +29,16 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.segment.indexing.BatchIOConfig;
import org.joda.time.Interval;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
@@ -40,6 +47,28 @@ import java.util.Map;
public abstract class AbstractTask implements Task
{
+
+ // This is mainly to avoid using combinations of IOConfig flags to figure
out the ingestion mode and
+ // also to use the mode as dimension in metrics
+ public enum IngestionMode
+ {
+ REPLACE, // replace with tombstones
+ APPEND, // append to existing segments
+ REPLACE_LEGACY, // original replace, it does not replace existing data for
empty time chunks in input intervals
+ HADOOP, // non-native batch, hadoop ingestion
+ NONE; // not an ingestion task (i.e. a kill task)
+
+ @JsonCreator
+ public static IngestionMode fromString(String name)
+ {
+ if (name == null) {
+ return null;
+ }
+ return valueOf(StringUtils.toUpperCase(name));
+ }
+ }
+ private final IngestionMode ingestionMode;
+
@JsonIgnore
private final String id;
@@ -54,9 +83,16 @@ public abstract class AbstractTask implements Task
private final Map<String, Object> context;
+ private final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+
+ protected AbstractTask(String id, String dataSource, Map<String, Object>
context, IngestionMode ingestionMode)
+ {
+ this(id, null, null, dataSource, context, ingestionMode);
+ }
+
protected AbstractTask(String id, String dataSource, Map<String, Object>
context)
{
- this(id, null, null, dataSource, context);
+ this(id, null, null, dataSource, context, IngestionMode.NONE);
}
protected AbstractTask(
@@ -64,7 +100,8 @@ public abstract class AbstractTask implements Task
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
- @Nullable Map<String, Object> context
+ @Nullable Map<String, Object> context,
+ @Nonnull IngestionMode ingestionMode
)
{
this.id = Preconditions.checkNotNull(id, "id");
@@ -73,6 +110,19 @@ public abstract class AbstractTask implements Task
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
// Copy the given context into a new mutable map because the Druid
indexing service can add some internal contexts.
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
+ this.ingestionMode = ingestionMode;
+ IndexTaskUtils.setTaskDimensions(metricBuilder, this);
+ }
+
+ protected AbstractTask(
+ String id,
+ @Nullable String groupId,
+ @Nullable TaskResource taskResource,
+ String dataSource,
+ @Nullable Map<String, Object> context
+ )
+ {
+ this(id, groupId, taskResource, dataSource, context, IngestionMode.NONE);
}
public static String getOrMakeId(@Nullable String id, final String typeName,
String dataSource)
@@ -219,4 +269,60 @@ public abstract class AbstractTask implements Task
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
}
+
+ protected ServiceMetricEvent.Builder getMetricBuilder()
+ {
+ return metricBuilder;
+ }
+
+ public IngestionMode getIngestionMode()
+ {
+ return ingestionMode;
+ }
+
+ protected static IngestionMode computeCompactionIngestionMode(@Nullable
CompactionIOConfig ioConfig)
+ {
+ // CompactionIOConfig does not have an isAppendToExisting method, so use
default (for batch since compaction
+ // is basically batch ingestion)
+ final boolean isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING;
+ final boolean isDropExisting = ioConfig == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
+ return computeIngestionMode(isAppendToExisting, isDropExisting);
+ }
+
+ protected static IngestionMode computeBatchIngestionMode(@Nullable
BatchIOConfig ioConfig)
+ {
+ final boolean isAppendToExisting = ioConfig == null
+ ? BatchIOConfig.DEFAULT_APPEND_EXISTING
+ : ioConfig.isAppendToExisting();
+ final boolean isDropExisting = ioConfig == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
+ return computeIngestionMode(isAppendToExisting, isDropExisting);
+ }
+
+ private static IngestionMode computeIngestionMode(boolean
isAppendToExisting, boolean isDropExisting)
+ {
+ if (!isAppendToExisting && isDropExisting) {
+ return IngestionMode.REPLACE;
+ } else if (isAppendToExisting && !isDropExisting) {
+ return IngestionMode.APPEND;
+ } else if (!isAppendToExisting) {
+ return IngestionMode.REPLACE_LEGACY;
+ }
+ throw new IAE("Cannot simultaneously replace and append to existing
segments. "
+ + "Either dropExisting or appendToExisting should be set to
false");
+ }
+
+ public void emitMetric(
+ ServiceEmitter emitter,
+ String metric,
+ Number value
+ )
+ {
+
+ if (emitter == null || metric == null || value == null) {
+ return;
+ }
+ emitter.emit(getMetricBuilder().build(metric, value));
+ }
+
+
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
index faedd3da70..3397888006 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.IOConfig;
import javax.annotation.Nullable;
@@ -46,7 +47,7 @@ public class CompactionIOConfig implements IOConfig
)
{
this.inputSpec = inputSpec;
- this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING :
dropExisting;
+ this.dropExisting = dropExisting == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
@JsonProperty
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 067f75ec7d..c521dbf3b8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -76,6 +76,7 @@ import
org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DimensionHandlerUtils;
@@ -196,7 +197,15 @@ public class CompactionTask extends AbstractBatchIndexTask
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
- super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource,
context, -1);
+ super(
+ getOrMakeId(id, TYPE, dataSource),
+ null,
+ taskResource,
+ dataSource,
+ context,
+ -1,
+ computeCompactionIngestionMode(ioConfig)
+ );
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("ioConfig", ioConfig),
@@ -422,9 +431,25 @@ public class CompactionTask extends AbstractBatchIndexTask
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}
+ @VisibleForTesting
+ void emitCompactIngestionModeMetrics(
+ ServiceEmitter emitter,
+ boolean isDropExisting
+ )
+ {
+
+ if (emitter == null) {
+ return;
+ }
+ emitMetric(emitter, "ingest/count", 1);
+ }
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
+
+ // emit metric for compact ingestion mode:
+ emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
+
final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
index d2022d5f30..39d74537fa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java
@@ -65,7 +65,7 @@ public abstract class HadoopTask extends
AbstractBatchIndexTask
Map<String, Object> context
)
{
- super(id, dataSource, context);
+ super(id, dataSource, context, IngestionMode.HADOOP);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 6eb05a1b8f..4aced8eb4d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -136,19 +136,22 @@ import java.util.function.Predicate;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
{
+
+
+
public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
private static final Logger log = new Logger(IndexTask.class);
private static final String TYPE = "index";
- private static String makeGroupId(IndexIngestionSpec ingestionSchema)
+ private static String makeGroupId(IndexIngestionSpec ingestionSchema,
IngestionMode ingestionMode)
{
- return makeGroupId(ingestionSchema.ioConfig.appendToExisting,
ingestionSchema.dataSchema.getDataSource());
+ return makeGroupId(ingestionSchema.dataSchema.getDataSource(),
ingestionMode);
}
- private static String makeGroupId(boolean isAppendToExisting, String
dataSource)
+ private static String makeGroupId(String dataSource, IngestionMode
ingestionMode)
{
- if (isAppendToExisting) {
+ if (ingestionMode == IngestionMode.APPEND) {
// Shared locking group for all tasks that append, since they are OK to
run concurrently.
return StringUtils.format("%s_append_%s", TYPE, dataSource);
} else {
@@ -191,7 +194,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
{
this(
id,
- makeGroupId(ingestionSchema),
+ makeGroupId(
+ ingestionSchema,
+ computeBatchIngestionMode(ingestionSchema.getIOConfig())
+ ),
taskResource,
ingestionSchema.dataSchema.getDataSource(),
null,
@@ -218,7 +224,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
resource,
dataSource,
context,
- maxAllowedLockCount
+ maxAllowedLockCount,
+ computeBatchIngestionMode(ingestionSchema.getIOConfig())
);
this.baseSequenceName = baseSequenceName == null ? getId() :
baseSequenceName;
this.ingestionSchema = ingestionSchema;
@@ -251,8 +258,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Override
public boolean requireLockExistingSegments()
{
- return isGuaranteedRollup(ingestionSchema.ioConfig,
ingestionSchema.tuningConfig)
- || !ingestionSchema.ioConfig.isAppendToExisting();
+ return isGuaranteedRollup(getIngestionMode(), ingestionSchema.tuningConfig)
+ || (getIngestionMode() != IngestionMode.APPEND);
}
@Override
@@ -270,7 +277,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Override
public boolean isPerfectRollup()
{
- return isGuaranteedRollup(ingestionSchema.ioConfig,
ingestionSchema.tuningConfig);
+ return isGuaranteedRollup(getIngestionMode(),
ingestionSchema.tuningConfig);
}
@Nullable
@@ -442,6 +449,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
public TaskStatus runTask(final TaskToolbox toolbox)
{
try {
+
+ // emit metric for sequential batch ingestion mode:
+ emitMetric(toolbox.getEmitter(), "ingest/count", 1);
+
log.debug("Found chat handler of class[%s]",
toolbox.getChatHandlerProvider().getClass().getName());
if (toolbox.getChatHandlerProvider().get(getId()).isPresent()) {
@@ -875,7 +886,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
null,
dataSchema,
getTaskLockHelper(),
- ingestionSchema.getIOConfig().isAppendToExisting(),
+ getIngestionMode(),
partitionAnalysis.getPartitionsSpec(),
null
);
@@ -941,10 +952,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
);
Set<DataSegment> tombStones = Collections.emptySet();
- if (ingestionSchema.getIOConfig().isDropExisting()) {
- TombstoneHelper tombstoneHelper = new
TombstoneHelper(pushed.getSegments(),
-
ingestionSchema.getDataSchema(),
-
toolbox.getTaskActionClient());
+ if (getIngestionMode() == IngestionMode.REPLACE) {
+ // check whether to generate tombstones...
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(
+ pushed.getSegments(),
+ ingestionSchema.getDataSchema(),
+ toolbox.getTaskActionClient()
+ );
List<Interval> tombstoneIntervals =
tombstoneHelper.computeTombstoneIntervals();
// now find the versions for the tombstone intervals
@@ -957,7 +971,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
);
tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
}
+
tombStones = tombstoneHelper.computeTombstones(tombstonesAndVersions);
+
+
log.debugSegments(tombStones, "To publish tombstones");
}
@@ -1001,6 +1018,14 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
buildSegmentsMeters.getThrownAway()
);
log.info("Published [%s] segments", published.getSegments().size());
+
+ // publish metrics:
+ emitMetric(toolbox.getEmitter(), "ingest/tombstones/count",
tombStones.size());
+ // segments count metric is documented to include tombstones
+ emitMetric(toolbox.getEmitter(), "ingest/segments/count",
+ published.getSegments().size() + tombStones.size()
+ );
+
log.debugSegments(published.getSegments(), "Published segments");
toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
@@ -1060,8 +1085,12 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
throw new IAE("Cannot use parser and inputSource together. Try using
inputFormat instead of parser.");
}
- if (ioConfig.isDropExisting() &&
dataSchema.getGranularitySpec().inputIntervals().isEmpty()) {
- throw new IAE("GranularitySpec's intervals cannot be empty when
setting dropExisting to true.");
+ IngestionMode ingestionMode =
AbstractTask.computeBatchIngestionMode(ioConfig);
+
+ if (ingestionMode == IngestionMode.REPLACE &&
dataSchema.getGranularitySpec()
+ .inputIntervals()
+ .isEmpty()) {
+ throw new IAE("GranularitySpec's intervals cannot be empty for
replace.");
}
if (ioConfig.getInputSource() != null &&
ioConfig.getInputSource().needsFormat()) {
@@ -1103,13 +1132,12 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@JsonTypeName("index")
public static class IndexIOConfig implements BatchIOConfig
{
- private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
private final InputFormat inputFormat;
- private final boolean appendToExisting;
- private final boolean dropExisting;
+ private boolean appendToExisting;
+ private boolean dropExisting;
@JsonCreator
public IndexIOConfig(
@@ -1129,12 +1157,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
this.firehoseFactory = firehoseFactory;
this.inputSource = inputSource;
this.inputFormat = inputFormat;
- this.appendToExisting = appendToExisting == null ?
DEFAULT_APPEND_TO_EXISTING : appendToExisting;
- this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING :
dropExisting;
- if (this.dropExisting && this.appendToExisting) {
- throw new IAE("Cannot both drop existing segments and append to
existing segments. "
- + "Either dropExisting or appendToExisting should be set
to false");
- }
+ this.appendToExisting = appendToExisting == null ?
BatchIOConfig.DEFAULT_APPEND_EXISTING : appendToExisting;
+ this.dropExisting = dropExisting == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
// old constructor for backward compatibility
@@ -1191,14 +1215,12 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
@Override
- @JsonProperty
public boolean isAppendToExisting()
{
return appendToExisting;
}
@Override
- @JsonProperty
public boolean isDropExisting()
{
return dropExisting;
@@ -1731,4 +1753,5 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
'}';
}
}
+
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 002b044399..44b838f69a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -114,6 +114,14 @@ public class IndexTaskUtils
metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
}
+ public static void setTaskDimensions(final ServiceMetricEvent.Builder
metricBuilder, final AbstractTask task)
+ {
+ metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
+ metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
+ metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
+ metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE,
((AbstractTask) task).getIngestionMode());
+ }
+
public static void setTaskStatusDimensions(
final ServiceMetricEvent.Builder metricBuilder,
final TaskStatus taskStatus
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index d2022589b1..bd9c3c8b45 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -55,7 +55,7 @@ public class OverlordCoordinatingSegmentAllocator implements
SegmentAllocatorFor
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final DataSchema dataSchema,
final TaskLockHelper taskLockHelper,
- final boolean appendToExisting,
+ final AbstractTask.IngestionMode ingestionMode,
final PartitionsSpec partitionsSpec
)
{
@@ -72,7 +72,7 @@ public class OverlordCoordinatingSegmentAllocator implements
SegmentAllocatorFor
.bucketInterval(row.getTimestamp())
.or(granularitySpec.getSegmentGranularity().bucket(row.getTimestamp()));
final PartialShardSpec partialShardSpec = createPartialShardSpec(
- appendToExisting,
+ ingestionMode,
partitionsSpec,
taskLockHelper,
interval
@@ -107,7 +107,7 @@ public class OverlordCoordinatingSegmentAllocator
implements SegmentAllocatorFor
}
private static PartialShardSpec createPartialShardSpec(
- boolean appendToExisting,
+ AbstractTask.IngestionMode ingestionMode,
PartitionsSpec partitionsSpec,
TaskLockHelper taskLockHelper,
Interval interval
@@ -115,7 +115,8 @@ public class OverlordCoordinatingSegmentAllocator
implements SegmentAllocatorFor
{
if (partitionsSpec.getType() == SecondaryPartitionType.LINEAR) {
if (taskLockHelper.isUseSegmentLock()) {
- if (taskLockHelper.hasOverwritingRootGenerationPartition(interval) &&
!appendToExisting) {
+ if (taskLockHelper.hasOverwritingRootGenerationPartition(interval) &&
(ingestionMode
+
!= AbstractTask.IngestionMode.APPEND)) {
final OverwritingRootGenerationPartitions
overwritingRootGenerationPartitions = taskLockHelper
.getOverwritingRootGenerationPartition(interval);
if (overwritingRootGenerationPartitions == null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
index 7bb72ca332..1e0216d810 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
@@ -43,19 +43,19 @@ public final class SegmentAllocators
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final DataSchema dataSchema,
final TaskLockHelper taskLockHelper,
- final boolean appendToExisting,
+ final AbstractTask.IngestionMode ingestionMode,
final PartitionsSpec partitionsSpec,
final @Nullable Boolean useLineageBasedSegmentAllocation
) throws IOException
{
- if (appendToExisting || taskLockHelper.isUseSegmentLock()) {
+ if (ingestionMode == AbstractTask.IngestionMode.APPEND ||
taskLockHelper.isUseSegmentLock()) {
return new OverlordCoordinatingSegmentAllocator(
toolbox,
sequenceName,
supervisorTaskAccess,
dataSchema,
taskLockHelper,
- appendToExisting,
+ ingestionMode,
partitionsSpec
);
} else {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
index 9b721ada5b..37b70c53ed 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
@@ -22,25 +22,23 @@ package
org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
{
- protected AbstractBatchSubtask(String id, String dataSource, Map<String,
Object> context)
- {
- super(id, dataSource, context);
- }
protected AbstractBatchSubtask(
String id,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
- @Nullable Map<String, Object> context
+ @Nullable Map<String, Object> context,
+ @Nonnull IngestionMode ingestionMode
)
{
- super(id, groupId, taskResource, dataSource, context, -1);
+ super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 02425ff91d..058b869dda 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -193,6 +193,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private IngestionState ingestionState;
+
@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
@@ -220,25 +221,26 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context,
- ingestionSchema.getTuningConfig().getMaxAllowedLockCount()
+ ingestionSchema.getTuningConfig().getMaxAllowedLockCount(),
+ computeBatchIngestionMode(ingestionSchema.getIOConfig())
);
this.ingestionSchema = ingestionSchema;
this.baseSubtaskSpecName = baseSubtaskSpecName == null ? getId() :
baseSubtaskSpecName;
-
- if (ingestionSchema.getIOConfig().isDropExisting() &&
+ if (getIngestionMode() == IngestionMode.REPLACE &&
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
- throw new ISE("GranularitySpec's intervals cannot be empty when setting
dropExisting to true.");
+ throw new ISE("GranularitySpec's intervals cannot be empty when using
replace.");
}
- if (isGuaranteedRollup(ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig())) {
+ if (isGuaranteedRollup(getIngestionMode(),
ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
}
this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
- this.missingIntervalsInOverwriteMode =
!ingestionSchema.getIOConfig().isAppendToExisting()
+ this.missingIntervalsInOverwriteMode = (getIngestionMode()
+ != IngestionMode.APPEND)
&& ingestionSchema.getDataSchema()
.getGranularitySpec()
.inputIntervals()
@@ -427,13 +429,16 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Override
public boolean requireLockExistingSegments()
{
- return !ingestionSchema.getIOConfig().isAppendToExisting();
+ return getIngestionMode() != IngestionMode.APPEND;
}
@Override
public boolean isPerfectRollup()
{
- return isGuaranteedRollup(getIngestionSchema().getIOConfig(),
getIngestionSchema().getTuningConfig());
+ return isGuaranteedRollup(
+ getIngestionMode(),
+ getIngestionSchema().getTuningConfig()
+ );
}
@Nullable
@@ -451,6 +456,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
+
if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
!= TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
LOG.warn("maxSavedParseExceptions is not supported yet");
@@ -489,8 +495,14 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
initializeSubTaskCleaner();
if (isParallelMode()) {
+ // emit metric for parallel batch ingestion mode:
+ emitMetric(toolbox.getEmitter(), "ingest/count", 1);
+
this.toolbox = toolbox;
- if (isGuaranteedRollup(ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig())) {
+ if (isGuaranteedRollup(
+ getIngestionMode(),
+ ingestionSchema.getTuningConfig()
+ )) {
return runMultiPhaseParallel(toolbox);
} else {
return runSinglePhaseParallel(toolbox);
@@ -1108,8 +1120,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
ingestionSchema
);
- Set<DataSegment> tombStones;
- if (ingestionSchema.getIOConfig().isDropExisting()) {
+
+ Set<DataSegment> tombStones = Collections.emptySet();
+ if (getIngestionMode() == IngestionMode.REPLACE) {
TombstoneHelper tombstoneHelper = new TombstoneHelper(
newSegments,
ingestionSchema.getDataSchema(),
@@ -1127,8 +1140,11 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
);
tombstonesAnShards.put(interval, segmentIdWithShardSpec);
}
+
tombStones = tombstoneHelper.computeTombstones(tombstonesAnShards);
+ // add tombstones
newSegments.addAll(tombStones);
+
LOG.debugSegments(tombStones, "To publish tombstones");
}
}
@@ -1146,6 +1162,11 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
if (published) {
LOG.info("Published [%d] segments", newSegments.size());
+
+ // segment metrics:
+ emitMetric(toolbox.getEmitter(), "ingest/tombstones/count",
tombStones.size());
+ emitMetric(toolbox.getEmitter(), "ingest/segments/count",
newSegments.size());
+
} else {
throw new ISE("Failed to publish segments");
}
@@ -1675,7 +1696,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
if (isParallelMode()) {
- if (isGuaranteedRollup(ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig())) {
+ if (isGuaranteedRollup(
+ getIngestionMode(),
+ ingestionSchema.getTuningConfig()
+ )) {
return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
(ParallelIndexTaskRunner<?, ?>) currentRunner,
includeUnparseable
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 35e00b4136..4259922b43 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -52,7 +52,7 @@ abstract class PerfectRollupWorkerTask extends
AbstractBatchSubtask
@Nullable Map<String, Object> context
)
{
- super(id, groupId, taskResource, dataSchema.getDataSource(), context);
+ super(id, groupId, taskResource, dataSchema.getDataSource(), context,
IngestionMode.NONE);
Preconditions.checkArgument(
tuningConfig.isForceGuaranteedRollup(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2970c509b7..0d47ea02d0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTask;
@@ -161,7 +162,8 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
- context
+ context,
+ AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig())
);
if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
@@ -172,7 +174,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
- this.missingIntervalsInOverwriteMode =
!ingestionSchema.getIOConfig().isAppendToExisting()
+ this.missingIntervalsInOverwriteMode =
ingestionSchema.getIOConfig().isAppendToExisting() != true
&& ingestionSchema.getDataSchema()
.getGranularitySpec()
.inputIntervals()
@@ -298,7 +300,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
@Override
public boolean requireLockExistingSegments()
{
- return !ingestionSchema.getIOConfig().isAppendToExisting();
+ return getIngestionMode() != IngestionMode.APPEND;
}
@Override
@@ -388,7 +390,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
getIngestionSchema().getDataSchema(),
getTaskLockHelper(),
- ingestionSchema.getIOConfig().isAppendToExisting(),
+ getIngestionMode(),
partitionsSpec,
useLineageBasedSegmentAllocation
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index b74569f7f3..f8502bac26 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -95,7 +95,8 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
groupId,
taskResource,
dataSchema.getDataSource(),
- context
+ context,
+ IngestionMode.APPEND
);
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig,
"tuningConfig");
@@ -146,6 +147,7 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
+ emitMetric(toolbox.getEmitter(), "ingest/count", 1);
return getRunner().run(toolbox);
}
@@ -261,7 +263,6 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
);
}
}
-
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index dce3b09aa7..6d55f841a5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1025,6 +1025,17 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
);
+ // emit segment count metric:
+ int segmentCount = 0;
+ if (publishedSegmentsAndCommitMetadata != null
+ && publishedSegmentsAndCommitMetadata.getSegments() != null) {
+ segmentCount =
publishedSegmentsAndCommitMetadata.getSegments().size();
+ }
+ task.emitMetric(
+ toolbox.getEmitter(),
+ "ingest/segment/count",
+ segmentCount
+ );
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
new file mode 100644
index 0000000000..dc3202c4ff
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AbstractTaskTest
+{
+
+ @Test
+ public void testBatchIOConfigAppend()
+ {
+ AbstractTask.IngestionMode ingestionMode =
AbstractTask.IngestionMode.fromString("APPEND");
+ Assert.assertEquals(AbstractTask.IngestionMode.APPEND, ingestionMode);
+ }
+
+ @Test
+ public void testBatchIOConfigReplace()
+ {
+ AbstractTask.IngestionMode ingestionMode =
AbstractTask.IngestionMode.fromString("REPLACE");
+ Assert.assertEquals(AbstractTask.IngestionMode.REPLACE, ingestionMode);
+ }
+
+ @Test
+ public void testBatchIOConfigOverwrite()
+ {
+ AbstractTask.IngestionMode ingestionMode =
AbstractTask.IngestionMode.fromString("REPLACE_LEGACY");
+ Assert.assertEquals(AbstractTask.IngestionMode.REPLACE_LEGACY,
ingestionMode);
+ }
+
+ @Test
+ public void testBatchIOConfigHadoop()
+ {
+ AbstractTask.IngestionMode ingestionMode =
AbstractTask.IngestionMode.fromString("HADOOP");
+ Assert.assertEquals(AbstractTask.IngestionMode.HADOOP, ingestionMode);
+ }
+
+ @Test
+ public void testBatchIOConfigNone()
+ {
+ AbstractTask.IngestionMode ingestionMode =
AbstractTask.IngestionMode.fromString("NONE");
+ Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode);
+ }
+
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index a0f85232f1..b88cf10a3a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -82,6 +82,8 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -112,8 +114,8 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.IOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -399,6 +401,27 @@ public class CompactionTaskTest
);
}
+ @Test
+ public void testCompactionTaskEmitter()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory,
+ RETRY_POLICY_FACTORY
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.tuningConfig(createTuningConfig());
+ builder.segmentGranularity(Granularities.HOUR);
+ final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+
+ // null emitter should work
+ taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(null,
false);
+ // non-null should also work
+ ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new
NoopEmitter());
+
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter,
false);
+
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter,
true);
+ }
+
@Test(expected = IAE.class)
public void
testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
{
@@ -898,7 +921,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -916,7 +939,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -972,7 +995,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -991,7 +1014,7 @@ public class CompactionTaskTest
tuningConfig,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1047,7 +1070,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1066,7 +1089,7 @@ public class CompactionTaskTest
tuningConfig,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1122,7 +1145,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1141,7 +1164,7 @@ public class CompactionTaskTest
tuningConfig,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1187,7 +1210,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
ingestionSpecs.sort(
@@ -1206,7 +1229,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1232,7 +1255,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1251,7 +1274,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1270,7 +1293,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1288,7 +1311,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1314,7 +1337,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1339,7 +1362,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1376,7 +1399,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1396,7 +1419,7 @@ public class CompactionTaskTest
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1415,7 +1438,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1433,7 +1456,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
new PeriodGranularity(Period.months(3), null, null),
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1456,7 +1479,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1476,7 +1499,7 @@ public class CompactionTaskTest
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
new PeriodGranularity(Period.months(3), null, null),
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1495,7 +1518,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1513,7 +1536,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1533,7 +1556,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1551,7 +1574,7 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1571,7 +1594,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
Assert.assertEquals(6, ingestionSpecs.size());
@@ -1596,7 +1619,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
- IOConfig.DEFAULT_DROP_EXISTING
+ BatchIOConfig.DEFAULT_DROP_EXISTING
);
Assert.assertEquals(6, ingestionSpecs.size());
for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
@@ -1913,7 +1936,10 @@ public class CompactionTaskTest
// assert ioConfig
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
- Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
+ Assert.assertEquals(
+ expectedDropExisting,
+ ioConfig.isDropExisting()
+ );
final InputSource inputSource = ioConfig.getInputSource();
Assert.assertTrue(inputSource instanceof DruidInputSource);
final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
@@ -2245,7 +2271,7 @@ public class CompactionTaskTest
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
- super(getOrMakeId(id, "compact", dataSource), null, taskResource,
dataSource, context);
+ super(getOrMakeId(id, "compact", dataSource), null, taskResource,
dataSource, context, IngestionMode.REPLACE_LEGACY);
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 856de0c105..babc3ff1fd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -858,7 +858,7 @@ public class IndexTaskTest extends IngestionTestBase
// Expect exception if reingest with dropExisting and null intervals is
attempted
expectedException.expect(IAE.class);
expectedException.expectMessage(
- "GranularitySpec's intervals cannot be empty when setting dropExisting
to true."
+ "GranularitySpec's intervals cannot be empty for replace."
);
IndexTask indexTask = new IndexTask(
null,
@@ -2700,7 +2700,7 @@ public class IndexTaskTest extends IngestionTestBase
{
expectedException.expect(IAE.class);
expectedException.expectMessage(
- "Cannot both drop existing segments and append to existing segments.
Either dropExisting or appendToExisting should be set to false"
+ "Cannot simultaneously replace and append to existing segments. Either
dropExisting or appendToExisting should be set to false"
);
new IndexTask(
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 20c7eb4fc2..499ba22bb1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -94,6 +94,7 @@ public class TaskSerdeTest
);
Assert.assertEquals(false, ioConfig.isAppendToExisting());
+ Assert.assertEquals(false, ioConfig.isDropExisting());
}
@Test
@@ -293,6 +294,7 @@ public class TaskSerdeTest
Assert.assertTrue(taskIoConfig.getInputSource() instanceof
LocalInputSource);
Assert.assertTrue(task2IoConfig.getInputSource() instanceof
LocalInputSource);
Assert.assertEquals(taskIoConfig.isAppendToExisting(),
task2IoConfig.isAppendToExisting());
+ Assert.assertEquals(taskIoConfig.isDropExisting(),
task2IoConfig.isDropExisting());
IndexTask.IndexTuningConfig taskTuningConfig =
task.getIngestionSchema().getTuningConfig();
IndexTask.IndexTuningConfig task2TuningConfig =
task2.getIngestionSchema().getTuningConfig();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 1b6e858c29..a187e9df8d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -559,7 +559,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
baseInputSource.withSplit(split),
getIngestionSchema().getIOConfig().getInputFormat(),
getIngestionSchema().getIOConfig().isAppendToExisting(),
- null
+ getIngestionSchema().getIOConfig().isDropExisting()
),
getIngestionSchema().getTuningConfig()
),
@@ -684,7 +684,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
getIngestionSchema().getDataSchema(),
getTaskLockHelper(),
- getIngestionSchema().getIOConfig().isAppendToExisting(),
+
AbstractTask.computeBatchIngestionMode(getIngestionSchema().getIOConfig()),
partitionsSpec,
true
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index c22c25531d..71b474e773 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -164,7 +164,7 @@ public class PerfectRollupWorkerTaskTest
@Override
public String getType()
{
- throw new UnsupportedOperationException();
+ return "TestPerfectRollupWorkerTask";
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index dc5fc0e1c6..a90ad8b3c0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -362,7 +362,7 @@ public class TaskQueueTest extends IngestionTestBase
private TestTask(String id, Interval interval, Map<String, Object> context)
{
- super(id, "datasource", context);
+ super(id, "datasource", context, IngestionMode.NONE);
this.interval = interval;
}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index bb3519d84e..5e6bcb2d5f 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -34,6 +34,7 @@ public class DruidMetrics
public static final String ID = "id";
public static final String TASK_ID = "taskId";
public static final String STATUS = "status";
+ public static final String TASK_INGESTION_MODE = "taskIngestionMode";
public static final String PARTITIONING_TYPE = "partitioningType";
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index 0419bc2b5f..348d9d22da 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,7 +21,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.indexing.IOConfig;
+import org.apache.druid.segment.indexing.BatchIOConfig;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -45,7 +45,7 @@ public class ClientCompactionIOConfig
)
{
this.inputSpec = inputSpec;
- this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING
: dropExisting;
+ this.dropExisting = dropExisting == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
@JsonProperty
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
index b05b38145a..946f1bca61 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.indexing;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
@@ -27,11 +28,17 @@ import org.apache.druid.data.input.InputSource;
*/
public interface BatchIOConfig extends IOConfig
{
+ boolean DEFAULT_DROP_EXISTING = false;
+ boolean DEFAULT_APPEND_EXISTING = false;
+
InputSource getInputSource();
InputFormat getInputFormat();
+ @JsonProperty
boolean isAppendToExisting();
+ @JsonProperty
boolean isDropExisting();
}
+
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
index 58f84c2bc4..b1784806e7 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
@@ -30,5 +30,4 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
})
public interface IOConfig
{
- boolean DEFAULT_DROP_EXISTING = false;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
index df5af287ac..a57104683b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.IOConfig;
import javax.annotation.Nullable;
@@ -43,7 +44,7 @@ public class UserCompactionTaskIOConfig
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
- this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING
: dropExisting;
+ this.dropExisting = dropExisting == null ?
BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
@JsonProperty
diff --git a/website/.spelling b/website/.spelling
index fd16c89754..44b7fc3349 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1468,6 +1468,7 @@ poolName
remoteAddress
segmentAvailabilityConfirmed
serviceName
+taskIngestionMode
taskStatus
taskType
threadPoolNumBusyThreads.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]