This is an automated email from the ASF dual-hosted git repository.
gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d22f28277fa feat: Native Minor compaction (#19016)
d22f28277fa is described below
commit d22f28277fa79a46969e4ad8ad91bff3c756d28d
Author: Virushade <[email protected]>
AuthorDate: Thu Mar 26 15:11:59 2026 +0800
feat: Native Minor compaction (#19016)
* Test Driven Dev
CompactionTaskTest
TaskLockHelperTest
* Minor Compaction Impl
* Deprecated fixes
* Check minor compaction
* Minor compaction impl
* Documentations
* Tidy up segmentProvider
* We no longer need the context key
* Bug fixes
* Trim up unnecessary code
* Docs and stylecheck
* Checkstyle
* Documentations first
* Integrate minor compaction spec
* Checkstyle
* Docs
* Tidy up changes
* Revert changes to SpecificSegmentsSpec
* Spell check
* Force time chunk lock only for minor compaction
* Docs
* Minor compaction test
* Native minor compaction add in default values
* Junit5 for TaskLockHelperTest
* ImmutableXXX.of() -> XXX.of() methods in Test
* Validate minor compaction task configs
* Compaction IO Config creation refactor
* Validations
* Test fixes
* Remove test asserting finding segments to lock
* Validation tests
* Rename uncompacted -> minor compaction
* Checkstyle
* Solve validation errors in InputSpecTest
* Change name to use 'minor' for compaction
* Minor compaction input spec fix
* Revert unintended changes in Druid.xml
* Remove uncompacted spellcheck exclusion
* Minor compaction constructor should validate the lock
* Nitpicked test name change
* Remove unnecessary validation
* Partial Compaction Testing
* Checkstyle
* Apply suggestion from @kfaraz
Co-authored-by: Kashif Faraz <[email protected]>
* Change ImmutableMap.of moments to Map.of
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
docs/data-management/manual-compaction.md | 54 +++-
.../druid/indexing/common/task/CompactionTask.java | 30 ++-
.../common/task/MinorCompactionInputSpec.java | 30 +--
.../common/task/NativeCompactionRunner.java | 103 +++++++-
.../indexing/common/task/SpecificSegmentsSpec.java | 4 +
.../indexing/common/task/CompactionTaskTest.java | 294 +++++++++++++++++++--
.../common/task/MinorCompactionInputSpecTest.java | 10 +-
.../indexing/common/task/TaskLockHelperTest.java | 224 ++++++++++++++++
.../task/batch/parallel/PartialCompactionTest.java | 156 ++++++++++-
.../indexing/ClientMinorCompactionInputSpec.java | 24 +-
.../indexing/ClientCompactionIntervalSpecTest.java | 6 +-
11 files changed, 849 insertions(+), 86 deletions(-)
diff --git a/docs/data-management/manual-compaction.md
b/docs/data-management/manual-compaction.md
index e6e34dba82e..4c13d4b40f2 100644
--- a/docs/data-management/manual-compaction.md
+++ b/docs/data-management/manual-compaction.md
@@ -117,9 +117,9 @@ The compaction `ioConfig` requires specifying `inputSpec`
as follows:
|Field|Description|Default|Required|
|-----|-----------|-------|--------|
|`type`|Task type. Set the value to `compact`.|none|Yes|
-|`inputSpec`|Specification of the target [interval](#interval-inputspec) or
[segments](#segments-inputspec).|none|Yes|
-|`dropExisting`|If `true`, the task replaces all existing segments fully
contained by either of the following:<br />- the `interval` in the `interval`
type `inputSpec`.<br />- the umbrella interval of the `segments` in the
`segment` type `inputSpec`.<br />If compaction fails, Druid does not change any
of the existing segments.<br />**WARNING**: `dropExisting` in `ioConfig` is a
beta feature. |false|No|
-|`allowNonAlignedInterval`|If `true`, the task allows an explicit
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with
the provided [interval](#interval-inputspec) or
[segments](#segments-inputspec). This parameter is only used if
[`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.<br
/><br />This parameter is provided for backwards compatibility. In most
scenarios it should not be set, as it can lead to data being accidentally
overshadow [...]
+|`inputSpec`|Specification of the target [interval](#interval-inputspec) or
[minor](#minor-inputspec).|none|Yes|
+|`dropExisting`|If `true`, the task replaces all existing segments fully
contained by the `interval` in the `interval` type `inputSpec`. If compaction
fails, Druid does not change any of the existing segments.<br />**WARNING**:
`dropExisting` in `ioConfig` is a beta feature. |false|No|
+|`allowNonAlignedInterval`|If `true`, the task allows an explicit
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with
the provided [interval](#interval-inputspec) or [minor](#minor-inputspec). This
parameter is only used if [`segmentGranularity`](#compaction-granularity-spec)
is explicitly provided.<br /><br />This parameter is provided for backwards
compatibility. In most scenarios it should not be set, as it can lead to data
being accidentally overshadowed. Th [...]
The compaction task has two kinds of `inputSpec`:
@@ -127,15 +127,55 @@ The compaction task has two kinds of `inputSpec`:
|Field|Description|Required|
|-----|-----------|--------|
-|`type`|Task type. Set the value to `interval`.|Yes|
+|`type`|Task type. Set the value to `interval` to trigger major
compaction.|Yes|
|`interval`|Interval to compact.|Yes|
-### Segments `inputSpec`
+### Minor `inputSpec`
|Field|Description|Required|
|-----|-----------|--------|
-|`type`|Task type. Set the value to `segments`.|Yes|
-|`segments`|A list of segment IDs.|Yes|
+|`type`|Task type. Set the value to `minor` to trigger minor compaction.|Yes|
+|`interval`|Interval to compact.|Yes|
+|`segments`|A list of segment descriptors.|Yes|
+
+The required segment descriptor fields can be retrieved from the "Segments"
section in the web console.
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`itvl`|Interval of segment to compact.|Yes|
+|`ver`|Version of the segment.|Yes|
+|`part`|Partition number of the segment.|Yes|
+
+#### Example minor inputSpec
+
+```json
+{
+ "type": "minor",
+ "interval": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+ "segments": [
+ {
+ "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+ "ver": "2020-01-01T00:07:18.186Z",
+ "part": 0
+ },
+ {
+ "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+ "ver": "2020-01-01T00:07:18.186Z",
+ "part": 1
+ }
+ ]
+}
+```
+
+When using the minor `inputSpec`, the task compacts only the specified
segments. Segments in the same interval that are not in the spec are upgraded
in place rather than compacted. This allows compacting a subset of segments
while preserving others.
+
+There are some requirements when triggering a minor compaction:
+- Set `useConcurrentLocks: true` in the task context. Minor compaction uses
REPLACE locks over the entire interval.
+- Set `dropExisting: true` in the ioConfig.
+
+### Segment `inputSpec`
+
+No longer documented as this is deprecated. Please use `interval` or `minor`
specs instead.
## Compaction dimensions spec
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 6cbdb1dc6a4..82310272b36 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -223,6 +223,23 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
//noinspection ConstantConditions
this.ioConfig = new
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
}
+
+ if (ioConfig != null && ioConfig.getInputSpec() != null &&
ioConfig.getInputSpec() instanceof MinorCompactionInputSpec) {
+ if (computeCompactionIngestionMode(ioConfig) != IngestionMode.REPLACE) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Minor compaction is only used with REPLACE
ingestion mode. Please set ioconfig[isDropExisting] to true.");
+ }
+
+ boolean usingConcurrentLocks =
this.getContextValue(Tasks.USE_CONCURRENT_LOCKS,
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
+
+ if (!usingConcurrentLocks) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Minor compaction is only used with REPLACE
ingestion mode. Please set ioconfig[%s] to true.", Tasks.USE_CONCURRENT_LOCKS);
+ }
+ }
+
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.transformSpec = transformSpec;
this.metricsSpec = metricsSpec;
@@ -1284,6 +1301,11 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
}
}
+ /**
+ * Provides segment discovery and validation for compaction.
+ * For minor compaction (MinorCompactionInputSpec), finds all segments
+ * in the interval and partitions them into 'compact and upgrade metadata'
vs 'upgrade metadata only' via {@link #shouldUpgradeSegment}.
+ */
@VisibleForTesting
static class SegmentProvider
{
@@ -1292,7 +1314,7 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
private final Interval interval;
private final boolean minorCompaction;
- private final Set<SegmentDescriptor> uncompactedSegments;
+ private final Set<SegmentDescriptor> segmentsToCompact;
SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
@@ -1301,17 +1323,17 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
this.interval = inputSpec.findInterval(dataSource);
if (inputSpec instanceof MinorCompactionInputSpec) {
minorCompaction = true;
- uncompactedSegments = Set.copyOf(((MinorCompactionInputSpec)
inputSpec).getUncompactedSegments());
+ segmentsToCompact = Set.copyOf(((MinorCompactionInputSpec)
inputSpec).getSegments());
} else {
minorCompaction = false;
- uncompactedSegments = null;
+ segmentsToCompact = null;
}
}
private boolean shouldUpgradeSegment(DataSegment s)
{
if (minorCompaction) {
- return !uncompactedSegments.contains(s.toDescriptor()) &&
this.interval.contains(s.getInterval());
+ return !segmentsToCompact.contains(s.toDescriptor()) &&
this.interval.contains(s.getInterval());
} else {
return false;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
index 0176386bf07..16fbcd4e5ca 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
@@ -38,40 +38,40 @@ import java.util.stream.Collectors;
*/
public class MinorCompactionInputSpec implements CompactionInputSpec
{
- public static final String TYPE = "uncompacted";
+ public static final String TYPE = "minor";
private final Interval interval;
- private final List<SegmentDescriptor> uncompactedSegments;
+ private final List<SegmentDescriptor> segments;
@JsonCreator
public MinorCompactionInputSpec(
@JsonProperty("interval") Interval interval,
- @JsonProperty("uncompactedSegments") List<SegmentDescriptor>
uncompactedSegments
+ @JsonProperty("segments") List<SegmentDescriptor> segments
)
{
- InvalidInput.conditionalException(interval != null, "Uncompacted interval
must not be null");
+ InvalidInput.conditionalException(interval != null, "Minor compaction
interval must not be null");
InvalidInput.conditionalException(
interval.toDurationMillis() > 0,
- "Uncompacted interval[%s] is empty, must specify a nonempty interval",
+ "Minor compaction interval[%s] is empty, must specify a nonempty
interval",
interval
);
InvalidInput.conditionalException(
- uncompactedSegments != null && !uncompactedSegments.isEmpty(),
- "Uncompacted segments must not be null or empty"
+ segments != null && !segments.isEmpty(),
+ "Minor compaction specified segments must not be null or empty"
);
// Validate that all segments are within the interval
List<SegmentDescriptor> segmentsNotInInterval =
- uncompactedSegments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
+ segments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
InvalidInput.conditionalException(
segmentsNotInInterval.isEmpty(),
- "All uncompacted segments must be within interval[%s], got segments
outside interval: %s",
+ "All segments must be within interval[%s], got segments outside
interval: %s",
interval,
segmentsNotInInterval
);
this.interval = interval;
- this.uncompactedSegments = uncompactedSegments;
+ this.segments = segments;
}
@JsonProperty
@@ -81,9 +81,9 @@ public class MinorCompactionInputSpec implements
CompactionInputSpec
}
@JsonProperty
- public List<SegmentDescriptor> getUncompactedSegments()
+ public List<SegmentDescriptor> getSegments()
{
- return uncompactedSegments;
+ return segments;
}
@Override
@@ -112,13 +112,13 @@ public class MinorCompactionInputSpec implements
CompactionInputSpec
}
MinorCompactionInputSpec that = (MinorCompactionInputSpec) o;
return Objects.equals(interval, that.interval) &&
- Objects.equals(uncompactedSegments, that.uncompactedSegments);
+ Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
- return Objects.hash(interval, uncompactedSegments);
+ return Objects.hash(interval, segments);
}
@Override
@@ -126,7 +126,7 @@ public class MinorCompactionInputSpec implements
CompactionInputSpec
{
return "MinorCompactionInputSpec{" +
"interval=" + interval +
- ", uncompactedSegments=" + uncompactedSegments +
+ ", segments=" + segments +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index afde45cdb13..74a9b0aaef4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -35,20 +35,24 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfi
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.indexing.input.WindowedSegmentId;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -99,9 +103,14 @@ public class NativeCompactionRunner implements
CompactionRunner
"Virtual columns in filter rules are not supported by the Native
compaction engine. Use MSQ compaction engine instead."
);
}
+
if (compactionTask.getIoConfig().getInputSpec() instanceof
MinorCompactionInputSpec) {
- return CompactionConfigValidationResult.failure(
- "Minor compaction is not supported by Native compaction engine. Use
MSQ compaction engine instead.");
+ boolean usingConcurrentLocks =
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS,
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
+ if (!usingConcurrentLocks) {
+ return CompactionConfigValidationResult.failure(
+ "Task context[%s] must be true when using native minor
compaction", Tasks.USE_CONCURRENT_LOCKS
+ );
+ }
}
return CompactionConfigValidationResult.success();
}
@@ -153,19 +162,32 @@ public class NativeCompactionRunner implements
CompactionRunner
SegmentCacheManagerFactory segmentCacheManagerFactory,
CompactionIOConfig compactionIOConfig
)
+ {
+ return (compactionIOConfig.getInputSpec() instanceof
MinorCompactionInputSpec)
+ ? createMinorCompactionIoConfig(toolbox, dataSchema, interval,
coordinatorClient, segmentCacheManagerFactory, compactionIOConfig)
+ : createMajorCompactionIoConfig(toolbox, dataSchema, interval,
coordinatorClient, segmentCacheManagerFactory, compactionIOConfig);
+ }
+
+ private static ParallelIndexIOConfig createMajorCompactionIoConfig(
+ TaskToolbox toolbox,
+ DataSchema dataSchema,
+ Interval inputInterval,
+ CoordinatorClient coordinatorClient,
+ SegmentCacheManagerFactory segmentCacheManagerFactory,
+ CompactionIOConfig compactionIOConfig
+ )
{
if (!compactionIOConfig.isAllowNonAlignedInterval()) {
- // Validate interval alignment.
final Granularity segmentGranularity =
dataSchema.getGranularitySpec().getSegmentGranularity();
final Interval widenedInterval = Intervals.utc(
- segmentGranularity.bucketStart(interval.getStart()).getMillis(),
- segmentGranularity.bucketEnd(interval.getEnd().minus(1)).getMillis()
+ segmentGranularity.bucketStart(inputInterval.getStart()).getMillis(),
+
segmentGranularity.bucketEnd(inputInterval.getEnd().minus(1)).getMillis()
);
- if (!interval.equals(widenedInterval)) {
+ if (!inputInterval.equals(widenedInterval)) {
throw new IAE(
"Interval[%s] to compact is not aligned with
segmentGranularity[%s]",
- interval,
+ inputInterval,
segmentGranularity
);
}
@@ -174,7 +196,7 @@ public class NativeCompactionRunner implements
CompactionRunner
return new ParallelIndexIOConfig(
new DruidInputSource(
dataSchema.getDataSource(),
- interval,
+ inputInterval,
null,
null,
null,
@@ -190,6 +212,67 @@ public class NativeCompactionRunner implements
CompactionRunner
);
}
+ private static ParallelIndexIOConfig createMinorCompactionIoConfig(
+ TaskToolbox toolbox,
+ DataSchema dataSchema,
+ Interval interval,
+ CoordinatorClient coordinatorClient,
+ SegmentCacheManagerFactory segmentCacheManagerFactory,
+ CompactionIOConfig compactionIOConfig
+ )
+ {
+ final List<WindowedSegmentId> segmentIds =
resolveSegmentIdsForMinorCompaction(
+ (MinorCompactionInputSpec) compactionIOConfig.getInputSpec(),
+ dataSchema.getDataSource(),
+ interval
+ );
+
+ return new ParallelIndexIOConfig(
+ new DruidInputSource(
+ dataSchema.getDataSource(),
+ null,
+ segmentIds,
+ null,
+ null,
+ null,
+ toolbox.getIndexIO(),
+ coordinatorClient,
+ segmentCacheManagerFactory,
+ toolbox.getConfig()
+ ).withTaskToolbox(toolbox),
+ null,
+ false,
+ compactionIOConfig.isDropExisting()
+ );
+ }
+
+ /**
+ * When using {@link MinorCompactionInputSpec}, resolves segment descriptors
to compact that belong
+ * to the given interval and returns them as {@link WindowedSegmentId}
objects.
+ */
+ private static List<WindowedSegmentId> resolveSegmentIdsForMinorCompaction(
+ MinorCompactionInputSpec inputSpec,
+ String dataSource,
+ Interval interval
+ )
+ {
+ final List<WindowedSegmentId> segmentIds = new ArrayList<>();
+ for (SegmentDescriptor desc : inputSpec.getSegments()) {
+ if (interval.contains(desc.getInterval())) {
+ final SegmentId segmentId = SegmentId.of(
+ dataSource,
+ desc.getInterval(),
+ desc.getVersion(),
+ desc.getPartitionNumber()
+ );
+ segmentIds.add(
+ new WindowedSegmentId(segmentId.toString(),
List.of(desc.getInterval()))
+ );
+ }
+ }
+ return segmentIds;
+ }
+
@Override
public TaskStatus runCompactionTasks(
CompactionTask compactionTask,
@@ -318,6 +401,10 @@ public class NativeCompactionRunner implements
CompactionRunner
newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY,
STORE_COMPACTION_STATE);
// Set the priority of the compaction task.
newContext.put(Tasks.PRIORITY_KEY, compactionTask.getPriority());
+ // Native minor compaction uses REPLACE ingestion mode, which uses time
chunk lock.
+ if (compactionTask.getIoConfig().getInputSpec() instanceof
MinorCompactionInputSpec) {
+ newContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+ }
return newContext;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
index 7b26c22a4a5..93f181d0dca 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
@@ -33,6 +33,10 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+/**
+ * @deprecated Use {@link MinorCompactionInputSpec} for minor compaction
instead.
+ */
+@Deprecated
public class SpecificSegmentsSpec implements CompactionInputSpec
{
public static final String TYPE = "segments";
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 3a51d5b9fe2..10c27c0b268 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -29,9 +29,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -50,6 +47,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
@@ -62,9 +60,12 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.actions.MarkSegmentToUpgradeAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionTestKit;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
@@ -73,6 +74,7 @@ import
org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import
org.apache.druid.indexing.common.task.NativeCompactionRunner.PartitionConfigurationManager;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -97,6 +99,7 @@ import
org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFa
import
org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexIO;
@@ -186,7 +189,7 @@ public class CompactionTaskTest
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL =
Intervals.of("2017-01-01/2017-07-01");
- private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
+ private static final List<Interval> SEGMENT_INTERVALS = List.of(
Intervals.of("2017-01-01/2017-02-01"),
Intervals.of("2017-02-01/2017-03-01"),
Intervals.of("2017-03-01/2017-04-01"),
@@ -271,7 +274,7 @@ public class CompactionTaskTest
DATA_SOURCE,
SEGMENT_INTERVALS.get(i),
"version_" + i,
- ImmutableMap.of(),
+ Map.of(),
findDimensions(i, SEGMENT_INTERVALS.get(i)),
AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
new NumberedShardSpec(0, 1),
@@ -299,7 +302,7 @@ public class CompactionTaskTest
);
GuiceInjectableValues injectableValues = new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.of(
+ List.of(
binder -> {
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new
NoopChatHandlerProvider());
@@ -359,6 +362,9 @@ public class CompactionTaskTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
private StubServiceEmitter emitter;
@Before
@@ -520,7 +526,7 @@ public class CompactionTaskTest
new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS))
)
.tuningConfig(createTuningConfig())
- .context(ImmutableMap.of("testKey", "testContext"))
+ .context(Map.of("testKey", "testContext"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -538,7 +544,7 @@ public class CompactionTaskTest
final CompactionTask task = builder
.segments(SEGMENTS)
.tuningConfig(createTuningConfig())
- .context(ImmutableMap.of("testKey", "testContext"))
+ .context(Map.of("testKey", "testContext"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -558,7 +564,7 @@ public class CompactionTaskTest
.segments(SEGMENTS)
.dimensionsSpec(
new DimensionsSpec(
- ImmutableList.of(
+ List.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim2"),
new StringDimensionSchema("dim3")
@@ -566,7 +572,7 @@ public class CompactionTaskTest
)
)
.tuningConfig(createTuningConfig())
- .context(ImmutableMap.of("testKey", "testVal"))
+ .context(Map.of("testKey", "testVal"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -675,7 +681,7 @@ public class CompactionTaskTest
new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS))
)
.tuningConfig(createTuningConfig())
- .context(ImmutableMap.of("testKey", "testContext"))
+ .context(Map.of("testKey", "testContext"))
.build();
Assert.assertTrue(task.getInputSourceResources().isEmpty());
@@ -783,7 +789,7 @@ public class CompactionTaskTest
expectedException.expectMessage(
"No segments found for compaction. Please check that datasource name
and interval are correct."
);
- provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
+ provider.checkSegments(LockGranularity.TIME_CHUNK, List.of());
}
@Test
@@ -1233,11 +1239,10 @@ public class CompactionTaskTest
final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
null));
- final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
final Map<QuerySegmentSpec, DataSchema> inputSchemas =
CompactionTask.createInputDataSchemas(
toolbox,
LockGranularity.TIME_CHUNK,
- new SegmentProvider(DATA_SOURCE,
SpecificSegmentsSpec.fromSegments(segments)),
+ new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
null,
null,
null,
@@ -1298,7 +1303,7 @@ public class CompactionTaskTest
COORDINATOR_CLIENT,
segmentCacheManagerFactory
);
- final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+ final List<DimensionsSpec> expectedDimensionsSpec = List.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
);
@@ -1393,7 +1398,7 @@ public class CompactionTaskTest
);
- final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+ final List<DimensionsSpec> expectedDimensionsSpec = List.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
);
@@ -1626,7 +1631,7 @@ public class CompactionTaskTest
new
DimensionRangePartitionsSpec(
3,
null,
- ImmutableList.of(
+ List.of(
"string_dim_1"),
false
))
@@ -1647,7 +1652,7 @@ public class CompactionTaskTest
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null,
null, true));
DimensionSchema stringDim = new StringDimensionSchema("string_dim_1",
null, null);
- builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+ builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
final CompactionTask compactionTask = builder.build();
// A string dimension with rollup=true should need MVD info
Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
@@ -1670,12 +1675,12 @@ public class CompactionTaskTest
new
DimensionRangePartitionsSpec(
3,
null,
- ImmutableList.of(
+ List.of(
stringDim.getName()),
false
))
.build());
- builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+ builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
CompactionTask compactionTask = builder.build();
Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
}
@@ -1702,7 +1707,7 @@ public class CompactionTaskTest
)
)
.build());
- builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+ builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
CompactionTask compactionTask = builder.build();
Assert.assertFalse(compactionTask.identifyMultiValuedDimensions());
}
@@ -1727,7 +1732,7 @@ public class CompactionTaskTest
@Test
public void testChooseFinestGranularityNone()
{
- List<Granularity> input = ImmutableList.of(
+ List<Granularity> input = List.of(
Granularities.DAY,
Granularities.SECOND,
Granularities.MINUTE,
@@ -1792,7 +1797,7 @@ public class CompactionTaskTest
private static List<DimensionsSpec>
getExpectedDimensionsSpecForAutoGeneration()
{
- return ImmutableList.of(
+ return List.of(
new DimensionsSpec(getDimensionSchema(new
StringDimensionSchema("string_to_double",
DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new
StringDimensionSchema("string_to_double",
DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new
StringDimensionSchema("string_to_double",
DimensionSchema.MultiValueHandling.ARRAY, null))),
@@ -1955,7 +1960,7 @@ public class CompactionTaskTest
List<Interval> intervals
)
{
- return
Futures.immediateFuture(ImmutableList.copyOf(segmentMap.keySet()));
+ return Futures.immediateFuture(List.copyOf(segmentMap.keySet()));
}
}
@@ -2032,6 +2037,236 @@ public class CompactionTaskTest
.build();
}
+ @Test
+ public void testMinorCompactionChecksIfSegmentsToCompactIsEmpty()
+ {
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new MinorCompactionInputSpec(COMPACTION_INTERVAL, List.of())
+ );
+ }
+
+ @Test
+ public void testMinorCompactionShouldAlwaysUseReplaceIngestionMode()
+ {
+ final Interval testInterval =
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+ final String version = "2024-11-17T23:49:06.823Z";
+ final DataSegment segment = createSegmentWithPartition(testInterval,
version, 1);
+
+ final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec(
+ testInterval,
+ List.of(segment.toDescriptor())
+ );
+
+ Assert.assertThrows(
+ DruidException.class,
+ // Setting dropExisting == false disables REPLACE mode.
+ () -> new Builder(DATA_SOURCE, segmentCacheManagerFactory)
+ .inputSpec(minorSpec, false)
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)).build()
+ );
+ }
+
+ @Test
+ public void testMinorCompactionUsesTimeChunkLockWithConcurrentLocks() throws
Exception
+ {
+ final Interval testInterval =
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+ final List<DataSegment> segments = List.of(
+ createSegmentWithPartition(testInterval, "v1", 0),
+ createSegmentWithPartition(testInterval, "v1", 1)
+ );
+ final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+ testInterval,
+
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ );
+
+ final CompactionTask task = new Builder(DATA_SOURCE,
segmentCacheManagerFactory)
+ .inputSpec(spec, true)
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+ .build();
+
+ taskActionTestKit.getTaskLockbox().add(task);
+ final TaskActionClient taskActionClient = new LocalTaskActionClient(
+ task,
+ taskActionTestKit.getTaskActionToolbox()
+ );
+ // Use a client that returns segments for RetrieveUsedSegmentsAction -
wrap to inject segments
+ final TaskActionClient segmentAwareClient = new TaskActionClient()
+ {
+ @Override
+ public <RetType> RetType submit(TaskAction<RetType> action) throws
IOException
+ {
+ if (action instanceof RetrieveUsedSegmentsAction) {
+ @SuppressWarnings("unchecked")
+ RetType retVal = (RetType) segments;
+ return retVal;
+ }
+ return taskActionClient.submit(action);
+ }
+ };
+
+ task.determineLockGranularityAndTryLock(segmentAwareClient,
List.of(testInterval));
+ Assert.assertEquals(LockGranularity.TIME_CHUNK,
task.getTaskLockHelper().getLockGranularityToUse());
+ }
+
+ @Test
+ public void testNativeMinorCompactionSubtaskUsesTimeChunkLock() throws
Exception
+ {
+ final Interval testInterval =
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+ final List<DataSegment> segments = List.of(
+ createSegmentWithPartition(testInterval, "v1", 0),
+ createSegmentWithPartition(testInterval, "v1", 1)
+ );
+ final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+ testInterval,
+
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ );
+
+ final CompactionTask compactionTask = new Builder(DATA_SOURCE,
segmentCacheManagerFactory)
+ .inputSpec(spec, true)
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+ .build();
+
+ final NativeCompactionRunner runner = new
NativeCompactionRunner(segmentCacheManagerFactory);
+ final Map<String, Object> subtaskContext =
runner.createContextForSubtask(compactionTask);
+
+ final DataSchema dataSchema = DataSchema.builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+ .withDimensions(
+ new DimensionsSpec(List.of(new StringDimensionSchema("dim1"), new
StringDimensionSchema("dim2")))
+ )
+ .withGranularity(
+ new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR,
false, List.of(testInterval))
+ )
+ .build();
+
+ final List<ParallelIndexIngestionSpec> ingestionSpecs =
NativeCompactionRunner.createIngestionSpecs(
+ Map.of(new MultipleIntervalSegmentSpec(List.of(testInterval)),
dataSchema),
+ toolbox,
+ new CompactionIOConfig(spec, false, null),
+ new PartitionConfigurationManager(null),
+ COORDINATOR_CLIENT,
+ segmentCacheManagerFactory
+ );
+
+ final ParallelIndexSupervisorTask subtask = new
ParallelIndexSupervisorTask(
+ "test_subtask",
+ "test_group",
+ null,
+ ingestionSpecs.get(0),
+ "base_0",
+ subtaskContext,
+ true
+ );
+
+ taskActionTestKit.getTaskLockbox().add(subtask);
+ final TaskActionClient taskActionClient = new LocalTaskActionClient(
+ subtask,
+ taskActionTestKit.getTaskActionToolbox()
+ );
+ final TaskActionClient segmentAwareClient = new TaskActionClient()
+ {
+ @Override
+ public <RetType> RetType submit(TaskAction<RetType> action) throws
IOException
+ {
+ if (action instanceof RetrieveUsedSegmentsAction) {
+ @SuppressWarnings("unchecked")
+ RetType retVal = (RetType) segments;
+ return retVal;
+ }
+ return taskActionClient.submit(action);
+ }
+ };
+ subtask.determineLockGranularityAndTryLock(segmentAwareClient,
List.of(testInterval));
+
+ Assert.assertEquals(
+ LockGranularity.TIME_CHUNK,
+ subtask.getTaskLockHelper().getLockGranularityToUse()
+ );
+ }
+
+ @Test
+ public void testSegmentProviderCheckSegmentsAllowsSubsetForTimeChunk()
throws Exception
+ {
+ final Interval testInterval =
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+ final List<DataSegment> allSegments = List.of(
+ createSegmentWithPartition(testInterval, "v1", 0),
+ createSegmentWithPartition(testInterval, "v1", 1),
+ createSegmentWithPartition(testInterval, "v1", 2)
+ );
+ final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+ testInterval,
+ List.of(allSegments.get(0).toDescriptor(),
allSegments.get(1).toDescriptor())
+ );
+
+ final SegmentProvider provider = new SegmentProvider(DATA_SOURCE, spec);
+ final TestTaskActionClient client = new TestTaskActionClient(allSegments);
+ provider.findSegments(client);
+
+ // Should not throw: specified segments (0,1) exist; segment 2 is not in
spec but is in interval (will be upgraded)
+ provider.checkSegments(LockGranularity.TIME_CHUNK, allSegments);
+ }
+
+ @Test
+ public void testDruidInputSourceReceivesSegmentIdsForMinorCompaction()
+ {
+ final Interval interval = Intervals.of("2024-01-01/2024-01-02");
+ final List<DataSegment> segments = List.of(
+ createSegmentWithPartition(interval, "v1", 0),
+ createSegmentWithPartition(interval, "v1", 1)
+ );
+ final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+ interval,
+
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ );
+
+ final DataSchema dataSchema = DataSchema.builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+ .withDimensions(
+ new DimensionsSpec(
+ List.of(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim2")
+ )
+ )
+ )
+ .withGranularity(
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.HOUR,
+ false,
+ List.of(interval)
+ )
+ )
+ .build();
+
+ final List<ParallelIndexIngestionSpec> ingestionSpecs =
NativeCompactionRunner.createIngestionSpecs(
+ Map.of(new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema),
+ toolbox,
+ new CompactionIOConfig(spec, false, null),
+ new PartitionConfigurationManager(null),
+ COORDINATOR_CLIENT,
+ segmentCacheManagerFactory
+ );
+
+ Assert.assertEquals(1, ingestionSpecs.size());
+ final InputSource inputSource =
ingestionSpecs.get(0).getIOConfig().getInputSource();
+ Assert.assertTrue(inputSource instanceof DruidInputSource);
+ final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
+ Assert.assertNotNull(druidInputSource.getSegmentIds());
+ Assert.assertEquals(2, druidInputSource.getSegmentIds().size());
+ }
+
+ private DataSegment createSegmentWithPartition(Interval interval, String
version, int partitionNum)
+ {
+ return DataSegment.builder(SegmentId.of(DATA_SOURCE, interval, version,
partitionNum))
+ .shardSpec(new NumberedShardSpec(partitionNum, 0))
+ .size(100)
+ .build();
+ }
+
private static class TestTaskActionClient implements TaskActionClient
{
private final List<DataSegment> segments;
@@ -2045,10 +2280,13 @@ public class CompactionTaskTest
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
- if (!(taskAction instanceof RetrieveUsedSegmentsAction)) {
- throw new ISE("action[%s] is not supported", taskAction);
+ if (taskAction instanceof RetrieveUsedSegmentsAction) {
+ return (RetType) segments;
+ }
+ if (taskAction instanceof MarkSegmentToUpgradeAction) {
+ return (RetType) Integer.valueOf(0);
}
- return (RetType) segments;
+ throw new ISE("action[%s] is not supported", taskAction);
}
}
@@ -2315,7 +2553,7 @@ public class CompactionTaskTest
@Override
public Set<ResourceAction> getInputSourceResources()
{
- return ImmutableSet.of();
+ return Set.of();
}
@JsonProperty
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
index c5a6f2fda03..71c59bc4a4a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
@@ -47,7 +47,7 @@ public class MinorCompactionInputSpecTest
Assert.assertEquals(spec, deserialized);
Assert.assertEquals(interval, deserialized.getInterval());
- Assert.assertEquals(segments, deserialized.getUncompactedSegments());
+ Assert.assertEquals(segments, deserialized.getSegments());
}
@Test
@@ -55,18 +55,18 @@ public class MinorCompactionInputSpecTest
{
ObjectMapper mapper = new DefaultObjectMapper();
String clientJson = "{"
- + "\"type\":\"uncompacted\","
+ + "\"type\":\"minor\","
+ "\"interval\":\"2015-04-11/2015-04-12\","
- +
"\"uncompactedSegments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]"
+ +
"\"segments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]"
+ "}";
MinorCompactionInputSpec deserialized = mapper.readValue(clientJson,
MinorCompactionInputSpec.class);
Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"),
deserialized.getInterval());
- Assert.assertEquals(1, deserialized.getUncompactedSegments().size());
+ Assert.assertEquals(1, deserialized.getSegments().size());
Assert.assertEquals(
new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0),
- deserialized.getUncompactedSegments().get(0)
+ deserialized.getSegments().get(0)
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
new file mode 100644
index 00000000000..3a47c5a0ab7
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TaskLockHelperTest
+{
+ private static final String DATA_SOURCE = "test_datasource";
+ private static final Interval TEST_INTERVAL =
Intervals.of("2017-01-01/2017-01-02");
+ private static final String TEST_VERSION = DateTimes.nowUtc().toString();
+
+ @Test
+ public void testVerifyNonConsecutiveSegmentsInInputFails()
+ {
+ // Test that non-consecutive segments within the input list fail.
+ // Compacting segments {0, 1, 3} should fail because root partition 2 is
missing.
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 1), // rootPartitionRange
[0, 1)
+ createSegment(1, 1, 2, (short) 1, (short) 1), // rootPartitionRange
[1, 2)
+ createSegment(3, 3, 4, (short) 1, (short) 1) // rootPartitionRange
[3, 4)
+ );
+
+ Assertions.assertThrows(
+ ISE.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ @Test
+ public void testVerifyConsecutiveSegmentsSucceedEvenIfOtherSegmentsMissing()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(3, 3, 4, (short) 1, (short) 1), // rootPartitionRange
[3, 4)
+ createSegment(4, 4, 5, (short) 1, (short) 1), // rootPartitionRange
[4, 5)
+ createSegment(5, 5, 6, (short) 1, (short) 1) // rootPartitionRange
[5, 6)
+ );
+
+
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+ }
+
+ @Test
+ public void testVerifyConsecutiveSegmentsStillWorks()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 1),
+ createSegment(1, 1, 2, (short) 1, (short) 1),
+ createSegment(2, 2, 3, (short) 1, (short) 1)
+ );
+
+
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+ }
+
+ @Test
+ public void testVerifyLargeGapSegmentsFails()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 1),
+ createSegment(1, 1, 2, (short) 1, (short) 1),
+ createSegment(10, 10, 11, (short) 1, (short) 1)
+ );
+
+ Assertions.assertThrows(
+ ISE.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ @Test
+ public void testVerifyAtomicUpdateGroupValidationStillWorks()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 2),
+ createSegment(1, 0, 1, (short) 1, (short) 2)
+ );
+
+
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+ }
+
+ @Test
+ public void testVerifyAtomicUpdateGroupIncompleteFails()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 3),
+ createSegment(1, 0, 1, (short) 1, (short) 3)
+ );
+
+ // Should throw ISE because atomicUpdateGroupSize is 3 but we only have 2
segments
+ Assertions.assertThrows(
+ ISE.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ @Test
+ public void testVerifyDifferentMinorVersionsFail()
+ {
+ // Test that segments with same root partition range but different minor
versions fail
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 2),
+ createSegment(1, 0, 1, (short) 2, (short) 2) // Different minor version
+ );
+ Assertions.assertThrows(
+ ISE.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ @Test
+ public void testVerifyDifferentAtomicUpdateGroupSizesFail()
+ {
+ // Test that segments with same root partition range but different
atomicUpdateGroupSize fail
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 2),
+ createSegment(1, 0, 1, (short) 1, (short) 3)
+ );
+ Assertions.assertThrows(
+ ISE.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ @Test
+ public void testVerifyEmptySegmentsList()
+ {
+ final List<DataSegment> segments = Collections.emptyList();
+
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+ }
+
+ @Test
+ public void testVerifySingleSegment()
+ {
+ final List<DataSegment> segments = ImmutableList.of(
+ createSegment(0, 0, 1, (short) 1, (short) 1)
+ );
+
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+ }
+
+ @Test
+ public void testVerifyDifferentIntervalsFail()
+ {
+ final Interval interval1 = Intervals.of("2017-01-01/2017-01-02");
+ final Interval interval2 = Intervals.of("2017-01-02/2017-01-03");
+ final List<DataSegment> segments = ImmutableList.of(
+ DataSegment.builder(SegmentId.of(DATA_SOURCE, interval1, TEST_VERSION,
0))
+ .shardSpec(new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID,
+ 0,
+ 1,
+ (short) 1,
+ (short) 1
+ ))
+ .size(0)
+ .build(),
+ DataSegment.builder(SegmentId.of(DATA_SOURCE, interval2, TEST_VERSION,
0))
+ .shardSpec(new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 1,
+ 1,
+ 2,
+ (short) 1,
+ (short) 1
+ ))
+ .size(0)
+ .build()
+ );
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+ );
+ }
+
+ /**
+ * Helper method to create a test segment with NumberedOverwriteShardSpec
+ */
+ private DataSegment createSegment(
+ int partitionId,
+ int startRootPartitionId,
+ int endRootPartitionId,
+ short minorVersion,
+ short atomicUpdateGroupSize
+ )
+ {
+ return DataSegment.builder(SegmentId.of(DATA_SOURCE, TEST_INTERVAL,
TEST_VERSION, partitionId))
+ .shardSpec(new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + partitionId,
+ startRootPartitionId,
+ endRootPartitionId,
+ minorVersion,
+ atomicUpdateGroupSize
+ ))
+ .size(0)
+ .build();
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index 9ef316e8aaa..882c1c73bb1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -31,7 +31,8 @@ import
org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
-import org.apache.druid.indexing.common.task.SpecificSegmentsSpec;
+import org.apache.druid.indexing.common.task.MinorCompactionInputSpec;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.DataSegmentsWithSchemas;
@@ -50,8 +51,11 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTest
{
@@ -73,7 +77,7 @@ public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTes
public PartialCompactionTest()
{
- super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE,
DEFAULT_TRANSIENT_API_FAILURE_RATE);
+ super(LockGranularity.TIME_CHUNK, true,
DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
}
@Before
@@ -137,8 +141,14 @@ public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTes
);
}
final CompactionTask compactionTask = newCompactionTaskBuilder()
- .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
+ .inputSpec(
+ new MinorCompactionInputSpec(
+ INTERVAL_TO_INDEX,
+
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ ), true
+ )
.tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2,
false))
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
.build();
dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
verifySchema(dataSegmentsWithSchemas);
@@ -197,8 +207,14 @@ public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTes
);
}
final CompactionTask compactionTask = newCompactionTaskBuilder()
- .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
+ .inputSpec(
+ new MinorCompactionInputSpec(
+ INTERVAL_TO_INDEX,
+
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ ), true
+ )
.tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2,
false))
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
.build();
dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
@@ -213,6 +229,138 @@ public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTes
}
}
+ @Test
+ public void testMinorCompactionUpgradesNonCompactedSegments()
+ {
+ DataSegmentsWithSchemas dataSegmentsWithSchemas = runTestTask(
+ new HashedPartitionsSpec(null, 4, null),
+ TaskState.SUCCESS,
+ false
+ );
+ verifySchema(dataSegmentsWithSchemas);
+ final Map<Interval, List<DataSegment>> hashPartitionedSegments =
+
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+ hashPartitionedSegments.values().forEach(
+ segmentsInInterval -> segmentsInInterval.sort(
+ Comparator.comparing(segment ->
segment.getShardSpec().getPartitionNum())
+ )
+ );
+
+ final List<DataSegment> segmentsToCompact = new ArrayList<>();
+ for (List<DataSegment> segmentsInInterval :
hashPartitionedSegments.values()) {
+ segmentsToCompact.addAll(segmentsInInterval.subList(0, Math.min(2,
segmentsInInterval.size())));
+ }
+ final Set<DataSegment> originalSegments =
dataSegmentsWithSchemas.getSegments();
+ final Set<String> compactedSegmentIds = segmentsToCompact.stream()
+ .map(segment ->
segment.getId().toString())
+
.collect(Collectors.toSet());
+ final Set<String> nonCompactedSegmentIds =
+ originalSegments.stream()
+ .map(segment -> segment.getId().toString())
+ .filter(segmentId ->
!compactedSegmentIds.contains(segmentId))
+ .collect(Collectors.toSet());
+ Assert.assertFalse(nonCompactedSegmentIds.isEmpty());
+ final Set<String> originalSegmentIds = new HashSet<>(compactedSegmentIds);
+ originalSegmentIds.addAll(nonCompactedSegmentIds);
+
+ final CompactionTask compactionTask = newCompactionTaskBuilder()
+ .inputSpec(
+ new MinorCompactionInputSpec(
+ INTERVAL_TO_INDEX,
+
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ ), true
+ )
+ .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2,
false))
+ .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+ .build();
+ dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
+ verifySchema(dataSegmentsWithSchemas);
+
+ // Check published segment set after compaction
+ final Set<DataSegment> publishedAfterCompaction =
dataSegmentsWithSchemas.getSegments();
+
Assert.assertFalse(SegmentUtils.groupSegmentsByInterval(publishedAfterCompaction).isEmpty());
+
+ final Set<String> finalSegmentIds = publishedAfterCompaction.stream()
+ .map(segment
-> segment.getId().toString())
+
.collect(Collectors.toSet());
+
+ final Map<String, String> upgradedFromSegmentIdMap =
+ getStorageCoordinator().retrieveUpgradedFromSegmentIds(DATASOURCE,
finalSegmentIds);
+ Assert.assertFalse(upgradedFromSegmentIdMap.isEmpty());
+
Assert.assertTrue(upgradedFromSegmentIdMap.values().stream().noneMatch(compactedSegmentIds::contains));
+
Assert.assertTrue(originalSegmentIds.containsAll(upgradedFromSegmentIdMap.values()));
+ for (final String successorSegmentId : upgradedFromSegmentIdMap.keySet()) {
+ Assert.assertTrue(finalSegmentIds.contains(successorSegmentId));
+ }
+
+ // Validate new segment ids (replacements and/or upgraded replicas)
+ final Set<String> newPublishedSegmentIds = new HashSet<>(finalSegmentIds);
+ newPublishedSegmentIds.removeAll(originalSegmentIds);
+ Assert.assertFalse(newPublishedSegmentIds.isEmpty());
+ Assert.assertTrue(
+ newPublishedSegmentIds.stream().anyMatch(id ->
!upgradedFromSegmentIdMap.containsKey(id))
+ );
+
+ // Index newly published ids by day for compacted-source checks below.
+ final Map<Interval, Set<String>> newSegmentIdsByInterval =
+ publishedAfterCompaction.stream()
+ .filter(segment -> !segment.isTombstone()
+ &&
newPublishedSegmentIds.contains(segment.getId().toString())
+ )
+ .collect(Collectors.groupingBy(
+ DataSegment::getInterval,
+ Collectors.mapping(
+ segment -> segment.getId().toString(),
+ Collectors.toCollection(HashSet::new)
+ )
+ ));
+
+ // Verify non-compacted segments are being replaced
+ for (final String parentSegmentId : nonCompactedSegmentIds) {
+ final List<String> successorSegmentIds =
upgradedFromSegmentIdMap.entrySet()
+
.stream()
+
.filter(e -> parentSegmentId.equals(e.getValue()))
+
.map(Map.Entry::getKey)
+
.toList();
+ if (finalSegmentIds.contains(parentSegmentId)) {
+ Assert.assertTrue(successorSegmentIds.isEmpty());
+ } else if (!successorSegmentIds.isEmpty()) {
+ Assert.assertEquals(1, successorSegmentIds.size());
+
Assert.assertTrue(finalSegmentIds.contains(successorSegmentIds.get(0)));
+ }
+ }
+
+ // Verify compacted segments have new published ID
+ for (final DataSegment compactedSource : segmentsToCompact) {
+ final String compactedSourceId = compactedSource.getId().toString();
+ Assert.assertFalse(finalSegmentIds.contains(compactedSourceId));
+ final Set<String> newIdsInSameInterval =
newSegmentIdsByInterval.getOrDefault(compactedSource.getInterval(), Set.of());
+ Assert.assertFalse(newIdsInSameInterval.isEmpty());
+ }
+
+ // non-compacted parents removed from published set match
retrieveUpgradedToSegmentIds
+ final Set<String> removedNonCompactedParentIds =
+ nonCompactedSegmentIds.stream().filter(id ->
!finalSegmentIds.contains(id)).collect(Collectors.toSet());
+ if (!removedNonCompactedParentIds.isEmpty()) {
+ final Map<String, Set<String>> upgradedToSegmentIdsByParent =
+ getStorageCoordinator().retrieveUpgradedToSegmentIds(DATASOURCE,
removedNonCompactedParentIds);
+ for (final String parentSegmentId : removedNonCompactedParentIds) {
+ final Set<String> expectedSuccessorIds =
upgradedFromSegmentIdMap.entrySet()
+
.stream()
+
.filter(e -> parentSegmentId.equals(e.getValue()))
+
.map(Map.Entry::getKey)
+
.collect(Collectors.toSet());
+ if (expectedSuccessorIds.isEmpty()) {
+ continue;
+ }
+ final Set<String> coordinatorSuccessorIds =
+ new
HashSet<>(upgradedToSegmentIdsByParent.getOrDefault(parentSegmentId, Set.of()));
+ coordinatorSuccessorIds.remove(parentSegmentId);
+
Assert.assertTrue(coordinatorSuccessorIds.containsAll(expectedSuccessorIds));
+ }
+ }
+ }
+
private DataSegmentsWithSchemas runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
index 751ec8d462a..50e5dd44bd5 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
@@ -35,22 +35,22 @@ import java.util.stream.Collectors;
*/
public class ClientMinorCompactionInputSpec extends
ClientCompactionIntervalSpec
{
- public static final String TYPE = "uncompacted";
+ public static final String TYPE = "minor";
- private final List<SegmentDescriptor> uncompactedSegments;
+ private final List<SegmentDescriptor> segments;
@JsonCreator
public ClientMinorCompactionInputSpec(
@JsonProperty("interval") Interval interval,
- @JsonProperty("uncompactedSegments") List<SegmentDescriptor>
uncompactedSegments
+ @JsonProperty("segments") List<SegmentDescriptor> segments
)
{
super(interval, null);
- if (uncompactedSegments == null || uncompactedSegments.isEmpty()) {
- throw InvalidInput.exception("'uncompactedSegments' must be non-empty.");
+ if (segments == null || segments.isEmpty()) {
+ throw InvalidInput.exception("'segments' must be non-empty.");
} else if (interval != null) {
List<SegmentDescriptor> segmentsNotInInterval =
- uncompactedSegments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
+ segments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
if (!segmentsNotInInterval.isEmpty()) {
throw new IAE(
"Can not supply segments outside interval[%s], got segments[%s].",
@@ -59,13 +59,13 @@ public class ClientMinorCompactionInputSpec extends
ClientCompactionIntervalSpec
);
}
}
- this.uncompactedSegments = uncompactedSegments;
+ this.segments = segments;
}
@JsonProperty
- public List<SegmentDescriptor> getUncompactedSegments()
+ public List<SegmentDescriptor> getSegments()
{
- return uncompactedSegments;
+ return segments;
}
@Override
@@ -81,13 +81,13 @@ public class ClientMinorCompactionInputSpec extends
ClientCompactionIntervalSpec
return false;
}
ClientMinorCompactionInputSpec that = (ClientMinorCompactionInputSpec)
object;
- return Objects.equals(uncompactedSegments, that.uncompactedSegments);
+ return Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), uncompactedSegments);
+ return Objects.hash(super.hashCode(), segments);
}
@Override
@@ -95,7 +95,7 @@ public class ClientMinorCompactionInputSpec extends
ClientCompactionIntervalSpec
{
return "ClientMinorCompactionInputSpec{" +
"interval=" + getInterval() +
- ",uncompactedSegments=" + uncompactedSegments +
+ ",segments=" + segments +
'}';
}
}
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
index 7e116a271eb..c6a41d7fe81 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
@@ -136,7 +136,7 @@ public class ClientCompactionIntervalSpecTest
ObjectMapper mapper = new DefaultObjectMapper();
Interval interval = Intervals.of("2015-04-11/2015-04-12");
- // Test without uncompactedSegments (full compaction)
+ // Test without segments (full compaction)
ClientCompactionIntervalSpec withoutSegments = new
ClientCompactionIntervalSpec(interval, null);
String json2 = mapper.writeValueAsString(withoutSegments);
ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2,
ClientCompactionIntervalSpec.class);
@@ -152,12 +152,12 @@ public class ClientCompactionIntervalSpecTest
new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0)
);
- // Test with uncompactedSegments (minor compaction)
+ // Test with segments (minor compaction)
ClientCompactionInputSpec withSegments = new
ClientMinorCompactionInputSpec(interval, segments);
String json1 = mapper.writeValueAsString(withSegments);
ClientCompactionInputSpec deserialized1 = mapper.readValue(json1,
ClientCompactionIntervalSpec.class);
Assert.assertTrue(deserialized1 instanceof ClientMinorCompactionInputSpec);
Assert.assertEquals(withSegments, deserialized1);
- Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec)
deserialized1).getUncompactedSegments());
+ Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec)
deserialized1).getSegments());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]