This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d25caaefa42 Add support for streaming ingestion with concurrent
replace (#15039)
d25caaefa42 is described below
commit d25caaefa4282a6b61663e8184d3c0b4092f6f35
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Oct 13 09:09:03 2023 +0530
Add support for streaming ingestion with concurrent replace (#15039)
Add support for streaming ingestion with concurrent replace
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
.../actions/SegmentTransactionalAppendAction.java | 100 ++++--
.../actions/SegmentTransactionalInsertAction.java | 44 +--
.../actions/SegmentTransactionalReplaceAction.java | 62 +++-
.../druid/indexing/common/actions/TaskLocks.java | 2 +
.../common/task/AbstractBatchIndexTask.java | 12 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 3 +-
.../druid/indexing/common/task/IndexTask.java | 8 +-
.../druid/indexing/common/task/IndexTaskUtils.java | 49 ++-
.../parallel/ParallelIndexSupervisorTask.java | 7 +-
.../druid/indexing/overlord/TaskLockbox.java | 62 +++-
.../indexing/overlord/TaskStorageQueryAdapter.java | 10 +
.../indexing/overlord/http/OverlordResource.java | 16 +
.../overlord/supervisor/SupervisorManager.java | 51 +++
.../seekablestream/PendingSegmentVersions.java | 56 +++
.../seekablestream/SeekableStreamIndexTask.java | 3 +-
.../SeekableStreamIndexTaskClient.java | 16 +
.../SeekableStreamIndexTaskClientAsyncImpl.java | 18 +
.../SeekableStreamIndexTaskRunner.java | 57 +++-
.../indexing/seekablestream/SequenceMetadata.java | 41 ++-
.../supervisor/SeekableStreamSupervisor.java | 20 ++
.../common/task/concurrent/ActionsTestTask.java | 30 +-
.../concurrent/ConcurrentReplaceAndAppendTest.java | 91 ++++-
.../druid/indexing/overlord/TaskLockboxTest.java | 77 +++++
.../seekablestream/SequenceMetadataTest.java | 30 +-
.../TestIndexerMetadataStorageCoordinator.java | 23 +-
.../util/emitter/service/SegmentMetadataEvent.java | 13 +
.../emitter/service/SegmentMetadataEventTest.java | 31 ++
.../IndexerMetadataStorageCoordinator.java | 33 +-
.../IndexerSQLMetadataStorageCoordinator.java | 376 +++++++++++++++------
.../apache/druid/metadata/LockFilterPolicy.java | 88 +++++
.../apache/druid/rpc/indexing/OverlordClient.java | 15 +-
.../druid/rpc/indexing/OverlordClientImpl.java | 9 +-
.../appenderator/SinkQuerySegmentWalker.java | 18 +-
.../realtime/appenderator/StreamAppenderator.java | 69 +++-
.../server/coordinator/duty/CompactSegments.java | 20 +-
.../druid/server/http/DataSourcesResource.java | 23 +-
.../druid/client/indexing/NoopOverlordClient.java | 5 +-
.../druid/rpc/indexing/OverlordClientImplTest.java | 21 +-
.../coordinator/duty/CompactSegmentsTest.java | 6 +-
39 files changed, 1314 insertions(+), 301 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 171c4f6640f..67b701718ca 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -22,15 +22,20 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,18 +47,40 @@ import java.util.stream.Collectors;
public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
+ @Nullable
+ private final DataSourceMetadata startMetadata;
+ @Nullable
+ private final DataSourceMetadata endMetadata;
- public static SegmentTransactionalAppendAction create(Set<DataSegment>
segments)
+ public static SegmentTransactionalAppendAction forSegments(Set<DataSegment>
segments)
{
- return new SegmentTransactionalAppendAction(segments);
+ return new SegmentTransactionalAppendAction(segments, null, null);
+ }
+
+ public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
+ Set<DataSegment> segments,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ return new SegmentTransactionalAppendAction(segments, startMetadata,
endMetadata);
}
@JsonCreator
private SegmentTransactionalAppendAction(
- @JsonProperty("segments") Set<DataSegment> segments
+ @JsonProperty("segments") Set<DataSegment> segments,
+ @JsonProperty("startMetadata") @Nullable DataSourceMetadata
startMetadata,
+ @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
+ this.startMetadata = startMetadata;
+ this.endMetadata = endMetadata;
+
+ if ((startMetadata == null && endMetadata != null)
+ || (startMetadata != null && endMetadata == null)) {
+ throw InvalidInput.exception("startMetadata and endMetadata must either
be both null or both non-null.");
+ }
}
@JsonProperty
@@ -62,6 +89,20 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
return segments;
}
+ @JsonProperty
+ @Nullable
+ public DataSourceMetadata getStartMetadata()
+ {
+ return startMetadata;
+ }
+
+ @JsonProperty
+ @Nullable
+ public DataSourceMetadata getEndMetadata()
+ {
+ return endMetadata;
+ }
+
@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
@@ -70,30 +111,48 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
};
}
- /**
- * Performs some sanity checks and publishes the given segments.
- */
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
+ // Verify that all the locks are of expected type
+ final List<TaskLock> locks =
toolbox.getTaskLockbox().findLocksForTask(task);
+ for (TaskLock lock : locks) {
+ if (lock.getType() != TaskLockType.APPEND) {
+ throw InvalidInput.exception(
+ "Cannot use action[%s] for task[%s] as it is holding a lock of
type[%s] instead of [APPEND].",
+ "SegmentTransactionalAppendAction", task.getId(), lock.getType()
+ );
+ }
+ }
+
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource,
toolbox.getTaskLockbox(), segments);
+ final CriticalAction.Action<SegmentPublishResult> publishAction;
+ if (startMetadata == null) {
+ publishAction = () ->
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
+ segments,
+ segmentToReplaceLock
+ );
+ } else {
+ publishAction = () ->
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
+ segments,
+ segmentToReplaceLock,
+ startMetadata,
+ endMetadata
+ );
+ }
+
final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
- .onValidLocks(
- () ->
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
- segments,
- segmentToReplaceLock
- )
- )
+ .onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher
priority task."
@@ -107,20 +166,7 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
throw new RuntimeException(e);
}
- // Emit metrics
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
- if (retVal.isSuccess()) {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
- for (DataSegment segment : retVal.getSegments()) {
- IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
-
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
- }
- } else {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure",
1));
- }
-
+ IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 9b23db71d46..5a9ca0cacdf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -33,13 +33,8 @@ import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -222,47 +217,10 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
throw new RuntimeException(e);
}
- // Emit metrics
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
- if (retVal.isSuccess()) {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
- } else {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure",
1));
- }
-
- // getSegments() should return an empty set if
announceHistoricalSegments() failed
- for (DataSegment segment : retVal.getSegments()) {
- metricBuilder.setDimension(DruidMetrics.INTERVAL,
segment.getInterval().toString());
- metricBuilder.setDimension(
- DruidMetrics.PARTITIONING_TYPE,
- segment.getShardSpec() == null ? null :
segment.getShardSpec().getType()
- );
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
- // Emit the segment related metadata using the configured emitters.
- // There is a possibility that some segments' metadata event might get
missed if the
- // server crashes after commiting segment but before emitting the event.
- this.emitSegmentMetadata(segment, toolbox);
- }
-
+ IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}
- private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox
toolbox)
- {
- SegmentMetadataEvent event = new SegmentMetadataEvent(
- segment.getDataSource(),
- DateTime.now(DateTimeZone.UTC),
- segment.getInterval().getStart(),
- segment.getInterval().getEnd(),
- segment.getVersion(),
- segment.getLastCompactionState() != null
- );
-
- toolbox.getEmitter().emit(event);
- }
-
private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap =
groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index 5a1228e1dd1..5a2b3ceec8f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -22,17 +22,20 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
-import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,6 +45,8 @@ import java.util.stream.Collectors;
*/
public class SegmentTransactionalReplaceAction implements
TaskAction<SegmentPublishResult>
{
+ private static final Logger log = new
Logger(SegmentTransactionalReplaceAction.class);
+
/**
* Set of segments to be inserted into metadata storage
*/
@@ -88,9 +93,9 @@ public class SegmentTransactionalReplaceAction implements
TaskAction<SegmentPubl
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);
- final SegmentPublishResult retVal;
+ final SegmentPublishResult publishResult;
try {
- retVal = toolbox.getTaskLockbox().doInCriticalSection(
+ publishResult = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
@@ -111,24 +116,45 @@ public class SegmentTransactionalReplaceAction implements
TaskAction<SegmentPubl
throw new RuntimeException(e);
}
- // Emit metrics
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
- if (retVal.isSuccess()) {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
+ IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
- for (DataSegment segment : retVal.getSegments()) {
- final String partitionType = segment.getShardSpec() == null ? null :
segment.getShardSpec().getType();
- metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE,
partitionType);
- metricBuilder.setDimension(DruidMetrics.INTERVAL,
segment.getInterval().toString());
-
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
+ // Upgrade any overlapping pending segments
+ // Do not perform upgrade in the same transaction as replace commit so that
+ // failure to upgrade pending segments does not affect success of the
commit
+ if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
+ try {
+ tryUpgradeOverlappingPendingSegments(task, toolbox);
+ }
+ catch (Exception e) {
+ log.error(e, "Error while upgrading pending segments for task[%s]",
task.getId());
}
- } else {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure",
1));
}
- return retVal;
+ return publishResult;
+ }
+
+ /**
+ * Tries to upgrade any pending segments that overlap with the committed
segments.
+ */
+ private void tryUpgradeOverlappingPendingSegments(Task task,
TaskActionToolbox toolbox)
+ {
+ final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
+ final Optional<String> activeSupervisorId =
supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
+ if (!activeSupervisorId.isPresent()) {
+ return;
+ }
+
+ Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedPendingSegments =
+
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
+ log.info(
+ "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
+ upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
+ );
+
+ upgradedPendingSegments.forEach(
+ (oldId, newId) -> toolbox.getSupervisorManager()
+
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(),
oldId, newId)
+ );
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index bb835997801..d6064935925 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -125,6 +125,8 @@ public class TaskLocks
&&
timeChunkLock.getDataSource().equals(segment.getDataSource())
&&
(timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
||
TaskLockType.APPEND.equals(timeChunkLock.getType()));
+ // APPEND locks always have the version DateTimes.EPOCH
(1970-01-01)
+ // and cover the segments irrespective of the segment version
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return
segmentLock.getInterval().contains(segment.getInterval())
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index ea61f37c7e9..fe19b35391e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -401,21 +401,21 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
/**
* Builds a TaskAction to publish segments based on the type of locks that
this
- * task acquires (determined by context property {@link
Tasks#TASK_LOCK_TYPE}).
+ * task acquires.
+ *
+ * @see #determineLockType
*/
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
- Set<DataSegment> segmentsToPublish
+ Set<DataSegment> segmentsToPublish,
+ TaskLockType lockType
)
{
- TaskLockType lockType = TaskLockType.valueOf(
- getContextValue(Tasks.TASK_LOCK_TYPE,
Tasks.DEFAULT_TASK_LOCK_TYPE.name())
- );
switch (lockType) {
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
- return SegmentTransactionalAppendAction.create(segmentsToPublish);
+ return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index dfa1f85fde7..3a599dd485b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -27,7 +27,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -696,7 +695,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
);
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
- (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
driver::registerHandoff,
+ driver::registerHandoff,
MoreExecutors.directExecutor()
));
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a2ca4f869ea..d880f3eb86a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -910,10 +911,11 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
throw new UOE("[%s] secondary partition type is not supported",
partitionsSpec.getType());
}
-
+ final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
- (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
-
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten,
segmentsToPublish));
+ (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
+ buildPublishAction(segmentsToBeOverwritten, segmentsToPublish,
taskLockType)
+ );
String effectiveId =
getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 20f7584c8eb..79a3e8993a8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.actions.TaskActionToolbox;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -35,7 +37,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
-import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@@ -45,29 +46,6 @@ import java.util.Map;
public class IndexTaskUtils
{
- @Nullable
- public static List<String> getMessagesFromSavedParseExceptions(
- CircularBuffer<ParseException> savedParseExceptions,
- boolean includeTimeOfException
- )
- {
- if (savedParseExceptions == null) {
- return null;
- }
-
- List<String> events = new ArrayList<>();
- for (int i = 0; i < savedParseExceptions.size(); i++) {
- if (includeTimeOfException) {
- DateTime timeOfException =
DateTimes.utc(savedParseExceptions.getLatest(i).getTimeOfExceptionMillis());
- events.add(timeOfException + ", " +
savedParseExceptions.getLatest(i).getMessage());
- } else {
- events.add(savedParseExceptions.getLatest(i).getMessage());
- }
- }
-
- return events;
- }
-
@Nullable
public static List<ParseExceptionReport>
getReportListFromSavedParseExceptions(
CircularBuffer<ParseExceptionReport> savedParseExceptionReports
@@ -152,4 +130,25 @@ public class IndexTaskUtils
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL,
segment.getInterval().toString());
}
+
+ public static void emitSegmentPublishMetrics(
+ SegmentPublishResult publishResult,
+ Task task,
+ TaskActionToolbox toolbox
+ )
+ {
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+
+ if (publishResult.isSuccess()) {
+ toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
+ for (DataSegment segment : publishResult.getSegments()) {
+ IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
+
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
+ toolbox.getEmitter().emit(SegmentMetadataEvent.create(segment,
DateTimes.nowUtc()));
+ }
+ } else {
+ toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure",
1));
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index d3e218623cd..e99ef35d942 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -42,6 +42,7 @@ import
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -1167,9 +1168,11 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
}
+ final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
- (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
-
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten,
segmentsToPublish));
+ (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
+ buildPublishAction(segmentsToBeOverwritten, segmentsToPublish,
taskLockType)
+ );
final boolean published =
newSegments.isEmpty()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 761c0b59160..54191adf05d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
@@ -38,12 +39,14 @@ import
org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
@@ -941,7 +944,7 @@ public class TaskLockbox
// Replace locks are always held by the supervisor task
if (posse.taskIds.size() > 1) {
- throw new ISE(
+ throw DruidException.defensive(
"Replace lock[%s] for datasource[%s] is held by multiple
tasks[%s]",
lock, datasource, posse.taskIds
);
@@ -956,6 +959,63 @@ public class TaskLockbox
return replaceLocks;
}
+ /**
+ * @param lockFilterPolicies Lock filters for the given datasources
+ * @return Map from datasource to intervals locked by tasks satisfying the
lock filter condititions
+ */
+ public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
+ {
+ final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+ // Take a lock and populate the maps
+ giant.lock();
+
+ try {
+ lockFilterPolicies.forEach(
+ lockFilter -> {
+ final String datasource = lockFilter.getDatasource();
+ if (!running.containsKey(datasource)) {
+ return;
+ }
+
+ final int priority = lockFilter.getPriority();
+ final boolean ignoreAppendLocks =
+
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
+
+ running.get(datasource).forEach(
+ (startTime, startTimeLocks) -> startTimeLocks.forEach(
+ (interval, taskLockPosses) -> taskLockPosses.forEach(
+ taskLockPosse -> {
+ if (taskLockPosse.getTaskLock().isRevoked()) {
+ // do nothing
+ } else if (ignoreAppendLocks
+ &&
TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) {
+ // do nothing
+ } else if (taskLockPosse.getTaskLock().getPriority()
== null
+ ||
taskLockPosse.getTaskLock().getPriority() < priority) {
+ // do nothing
+ } else {
+ datasourceToIntervals.computeIfAbsent(datasource,
k -> new HashSet<>())
+ .add(interval);
+ }
+ }
+ )
+ )
+ );
+ }
+ );
+ }
+ finally {
+ giant.unlock();
+ }
+
+ return datasourceToIntervals.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> new ArrayList<>(entry.getValue())
+ ));
+ }
+
/**
* Gets a List of Intervals locked by higher priority tasks for each
datasource.
* Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
index 3fa570ccb32..140d9b7ac40 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -28,6 +28,7 @@ import
org.apache.druid.indexing.common.actions.SegmentInsertAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -60,6 +61,15 @@ public class TaskStorageQueryAdapter
return storage.getActiveTasks();
}
+ /**
+ * @param lockFilterPolicies Requests for conflicing lock intervals for
various datasources
+ * @return Map from datasource to intervals locked by tasks that have a
conflicting lock type that cannot be revoked
+ */
+ public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
+ {
+ return taskLockbox.getLockedIntervals(lockFilterPolicies);
+ }
+
/**
* Gets a List of Intervals locked by higher priority tasks for each
datasource.
*
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index fa61f796154..f9604492dd4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -63,6 +63,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@@ -260,6 +261,7 @@ public class OverlordResource
}
}
+ @Deprecated
@POST
@Path("/lockedIntervals")
@Produces(MediaType.APPLICATION_JSON)
@@ -274,6 +276,20 @@ public class OverlordResource
return
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
}
+ @POST
+ @Path("/lockedIntervals/v2")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy>
lockFilterPolicies)
+ {
+ if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+ return Response.status(Status.BAD_REQUEST).entity("No filter
provided").build();
+ }
+
+ // Build the response
+ return
Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build();
+ }
+
@GET
@Path("/task/{taskid}")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 2cd926bae90..d55f3cc8bd0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -24,12 +24,14 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import javax.annotation.Nullable;
@@ -69,6 +71,22 @@ public class SupervisorManager
return supervisors.keySet();
}
+ public Optional<String> getActiveSupervisorIdForDatasource(String datasource)
+ {
+ for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry :
supervisors.entrySet()) {
+ final String supervisorId = entry.getKey();
+ final Supervisor supervisor = entry.getValue().lhs;
+ final SupervisorSpec supervisorSpec = entry.getValue().rhs;
+ if (supervisor instanceof SeekableStreamSupervisor
+ && !supervisorSpec.isSuspended()
+ && supervisorSpec.getDataSources().contains(datasource)) {
+ return Optional.of(supervisorId);
+ }
+ }
+
+ return Optional.absent();
+ }
+
public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
@@ -247,6 +265,39 @@ public class SupervisorManager
return false;
}
+ /**
+ * Registers a new version of the given pending segment on a supervisor. This
+ * allows the supervisor to include the pending segment in queries fired
against
+ * that segment version.
+ */
+ public boolean registerNewVersionOfPendingSegmentOnSupervisor(
+ String supervisorId,
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newSegmentVersion
+ )
+ {
+ try {
+ Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
+ Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment
cannot be null");
+ Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot
be null");
+
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ Preconditions.checkNotNull(supervisor, "supervisor could not be found");
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+ return false;
+ }
+
+ SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor =
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
+
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment,
newSegmentVersion);
+ return true;
+ }
+ catch (Exception e) {
+ log.error(e, "PendingSegment[%s] mapping update request to version[%s]
on Supervisor[%s] failed",
+ basePendingSegment.asSegmentId(),
newSegmentVersion.getVersion(), supervisorId);
+ }
+ return false;
+ }
+
/**
* Stops a supervisor with a given id and then removes it from the list.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
new file mode 100644
index 00000000000..146b0afc4b9
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+
+/**
+ * Contains a new version of an existing base pending segment. Used by realtime
+ * tasks to serve queries against multiple versions of the same pending
segment.
+ */
+public class PendingSegmentVersions
+{
+ private final SegmentIdWithShardSpec baseSegment;
+ private final SegmentIdWithShardSpec newVersion;
+
+ @JsonCreator
+ public PendingSegmentVersions(
+ @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment,
+ @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion
+ )
+ {
+ this.baseSegment = baseSegment;
+ this.newVersion = newVersion;
+ }
+
+ @JsonProperty
+ public SegmentIdWithShardSpec getBaseSegment()
+ {
+ return baseSegment;
+ }
+
+ @JsonProperty
+ public SegmentIdWithShardSpec getNewVersion()
+ {
+ return newVersion;
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 3aca46fbfae..d74ee5c0be2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -106,7 +107,7 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
this.lockGranularityToUse =
getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
? LockGranularity.TIME_CHUNK
: LockGranularity.SEGMENT;
- this.lockTypeToUse = getContextValue(Tasks.USE_SHARED_LOCK, false) ?
TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+ this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
}
protected static String getFormattedGroupId(String dataSource, String type)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
index 18631626d0f..5e592424960 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import java.util.List;
@@ -153,6 +154,21 @@ public interface
SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetTy
*/
ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String
id);
+ /**
+ * Update the task state to redirect queries for later versions to the root
pending segment.
+ * The task also announces that it is serving the segments belonging to the
subsequent versions.
+ * The update is processed only if the task is serving the original pending
segment.
+ * @param taskId - task id
+ * @param basePendingSegment - the pending segment that was originally
allocated
+ * @param newVersionOfSegment - the ids belonging to the versions to which
the root segment needs to be updated
+ * @return true if the update succeeds
+ */
+ ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
+ String taskId,
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newVersionOfSegment
+ );
+
Class<PartitionIdType> getPartitionType();
Class<SequenceOffsetType> getSequenceType();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
index 9d6d49e00bf..40d475909e6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
@@ -57,6 +57,7 @@ import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@@ -193,6 +194,23 @@ public abstract class
SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
.go();
}
+ @Override
+ public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
+ String taskId,
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newVersionOfSegment
+ )
+ {
+ final RequestBuilder requestBuilder
+ = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
+ .jsonContent(jsonMapper, new
PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
+
+ return makeRequest(taskId, requestBuilder)
+ .handler(IgnoreHttpResponseHandler.INSTANCE)
+ .onSuccess(r -> true)
+ .go();
+ }
+
@Override
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 27909aea83c..769413d6ffc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -46,6 +46,8 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -60,6 +62,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import
org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
+import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -83,6 +86,7 @@ import
org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Access;
@@ -319,7 +323,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
previous.getValue(),
current.getValue(),
true,
- exclusiveStartPartitions
+ exclusiveStartPartitions,
+ getTaskLockType()
)
);
previous = current;
@@ -334,7 +339,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
previous.getValue(),
endOffsets,
false,
- exclusiveStartPartitions
+ exclusiveStartPartitions,
+ getTaskLockType()
)
);
} else {
@@ -345,7 +351,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
endOffsets,
false,
- ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+ ioConfig.getStartSequenceNumbers().getExclusivePartitions(),
+ getTaskLockType()
)
);
}
@@ -444,7 +451,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} else {
final TaskLock lock = toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(
- TaskLockType.EXCLUSIVE,
+
TaskLocks.determineLockTypeForAppend(task.getContext()),
segmentId.getInterval(),
1000L
)
@@ -925,6 +932,11 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return TaskStatus.success(task.getId());
}
+ private TaskLockType getTaskLockType()
+ {
+ return TaskLocks.determineLockTypeForAppend(task.getContext());
+ }
+
private void checkPublishAndHandoffFailure() throws ExecutionException,
InterruptedException
{
// Check if any publishFuture failed.
@@ -1541,6 +1553,40 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return setEndOffsets(sequences, finish);
}
+ @POST
+ @Path("/pendingSegmentVersion")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response registerNewVersionOfPendingSegment(
+ PendingSegmentVersions pendingSegmentVersions,
+ // this field is only for internal purposes, shouldn't be usually set by
users
+ @Context final HttpServletRequest req
+ )
+ {
+ authorizationCheck(req, Action.WRITE);
+ try {
+ ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
+ pendingSegmentVersions.getBaseSegment(),
+ pendingSegmentVersions.getNewVersion()
+ );
+ return Response.ok().build();
+ }
+ catch (DruidException e) {
+ return Response
+ .status(e.getStatusCode())
+ .entity(new ErrorResponse(e))
+ .build();
+ }
+ catch (Exception e) {
+ log.error(
+ e,
+ "Could not register new version[%s] of pending segment[%s]",
+ pendingSegmentVersions.getNewVersion(),
pendingSegmentVersions.getBaseSegment()
+ );
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
public Map<String, Object> doGetRowStats()
{
Map<String, Object> returnMap = new HashMap<>();
@@ -1712,7 +1758,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
sequenceNumbers,
endOffsets,
false,
- exclusiveStartPartitions
+ exclusiveStartPartitions,
+ getTaskLockType()
);
log.info(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 161a36de2fd..b5a65e99462 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -25,8 +25,12 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.Committer;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -54,6 +58,7 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
private final String sequenceName;
private final Set<PartitionIdType> exclusiveStartPartitions;
private final Set<PartitionIdType> assignments;
+ private final TaskLockType taskLockType;
private final boolean sentinel;
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This
lock is required because
@@ -73,7 +78,8 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
@JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType>
startOffsets,
@JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType>
endOffsets,
@JsonProperty("checkpointed") boolean checkpointed,
- @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions
+ @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions,
+ @JsonProperty("taskLockType") @Nullable TaskLockType taskLockType
)
{
Preconditions.checkNotNull(sequenceName);
@@ -86,6 +92,7 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
+ this.taskLockType = taskLockType;
this.exclusiveStartPartitions = exclusiveStartPartitions == null
? Collections.emptySet()
: exclusiveStartPartitions;
@@ -139,6 +146,12 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
}
}
+ @JsonProperty
+ public TaskLockType getTaskLockType()
+ {
+ return taskLockType;
+ }
+
@JsonProperty
public boolean isSentinel()
{
@@ -363,7 +376,7 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
);
}
- final SegmentTransactionalInsertAction action;
+ final TaskAction<SegmentPublishResult> action;
if (segmentsToPush.isEmpty()) {
// If a task ingested no data but made progress reading through its
assigned partitions,
@@ -395,19 +408,21 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
);
}
} else if (useTransaction) {
- action = SegmentTransactionalInsertAction.appendAction(
- segmentsToPush,
- runner.createDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(
- finalPartitions.getStream(),
- getStartOffsets(),
- exclusiveStartPartitions
- )
- ),
- runner.createDataSourceMetadata(finalPartitions)
+ final DataSourceMetadata startMetadata =
runner.createDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(
+ finalPartitions.getStream(),
+ getStartOffsets(),
+ exclusiveStartPartitions
+ )
);
+ final DataSourceMetadata endMetadata =
runner.createDataSourceMetadata(finalPartitions);
+ action = taskLockType == TaskLockType.APPEND
+ ?
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush,
startMetadata, endMetadata)
+ :
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata,
endMetadata);
} else {
- action = SegmentTransactionalInsertAction.appendAction(segmentsToPush,
null, null);
+ action = taskLockType == TaskLockType.APPEND
+ ? SegmentTransactionalAppendAction.forSegments(segmentsToPush)
+ :
SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
}
return toolbox.getTaskActionClient().submit(action);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2f6cb008b84..1d05169e3fb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -97,6 +97,7 @@ import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import javax.annotation.Nonnull;
@@ -1092,6 +1093,25 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ public void registerNewVersionOfPendingSegment(
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newSegmentVersion
+ )
+ {
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.registerNewVersionOfPendingSegmentAsync(taskId,
basePendingSegment, newSegmentVersion);
+ }
+ }
+ for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values())
{
+ for (TaskGroup taskGroup : taskGroupList) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.registerNewVersionOfPendingSegmentAsync(taskId,
basePendingSegment, newSegmentVersion);
+ }
+ }
+ }
+ }
+
public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index b78efcbc346..69a8b6cc103 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
@@ -60,6 +61,11 @@ public class ActionsTestTask extends CommandQueueTask
return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.REPLACE,
interval));
}
+ public Void releaseLock(Interval interval)
+ {
+ return runAction(new LockReleaseAction(interval));
+ }
+
public TaskLock acquireAppendLockOn(Interval interval)
{
return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.APPEND,
interval));
@@ -75,7 +81,7 @@ public class ActionsTestTask extends CommandQueueTask
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
return runAction(
- SegmentTransactionalAppendAction.create(Sets.newHashSet(segments))
+ SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
);
}
@@ -97,6 +103,28 @@ public class ActionsTestTask extends CommandQueueTask
);
}
+ public SegmentIdWithShardSpec allocateSegmentForTimestamp(
+ DateTime timestamp,
+ Granularity preferredSegmentGranularity,
+ String sequenceName
+ )
+ {
+ return runAction(
+ new SegmentAllocateAction(
+ getDataSource(),
+ timestamp,
+ Granularities.SECOND,
+ preferredSegmentGranularity,
+ getId() + "__" + sequenceName,
+ null,
+ false,
+ NumberedPartialShardSpec.instance(),
+ LockGranularity.TIME_CHUNK,
+ TaskLockType.APPEND
+ )
+ );
+ }
+
private <T> T runAction(TaskAction<T> action)
{
return execute(() -> client.submit(action));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 293503b1c72..22f21fb79b6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -66,11 +66,14 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -626,10 +629,10 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
// Allocate an append segment for v1
final ActionsTestTask appendTask1 = createAndStartTask();
- appendTask1.acquireAppendLockOn(YEAR_23);
final SegmentIdWithShardSpec pendingSegmentV11
= appendTask1.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
- Assert.assertEquals(segmentV10.getVersion(),
pendingSegmentV11.getVersion());
+ Assert.assertEquals(v1, pendingSegmentV11.getVersion());
+ Assert.assertEquals(YEAR_23, pendingSegmentV11.getInterval());
// Commit replace segment for v2
final ActionsTestTask replaceTask2 = createAndStartTask();
@@ -771,6 +774,90 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11,
segmentV13);
}
+ @Test
+ public void testSegmentIsAllocatedAtLatestVersion()
+ {
+ final SegmentIdWithShardSpec pendingSegmentV01
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval());
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ replaceTask.commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
+
+ final SegmentIdWithShardSpec pendingSegmentV12
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertNotEquals(pendingSegmentV01.asSegmentId(),
pendingSegmentV12.asSegmentId());
+ Assert.assertEquals(v1, pendingSegmentV12.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV12.getInterval());
+
+ replaceTask.releaseLock(JAN_23);
+ final ActionsTestTask replaceTask2 = createAndStartTask();
+ final String v2 = replaceTask2.acquireReplaceLockOn(JAN_23).getVersion();
+ final DataSegment segmentV20 = createSegment(JAN_23, v2);
+ replaceTask2.commitReplaceSegments(segmentV20);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV20);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV20);
+
+ final SegmentIdWithShardSpec pendingSegmentV23
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertNotEquals(pendingSegmentV01.asSegmentId(),
pendingSegmentV23.asSegmentId());
+ Assert.assertEquals(v2, pendingSegmentV23.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV23.getInterval());
+
+ // Commit the append segments
+ final DataSegment segmentV01 = asSegment(pendingSegmentV01);
+ final DataSegment segmentV12 = asSegment(pendingSegmentV12);
+ final DataSegment segmentV23 = asSegment(pendingSegmentV23);
+
+ Set<DataSegment> appendedSegments
+ = appendTask.commitAppendSegments(segmentV01, segmentV12,
segmentV23).getSegments();
+ Assert.assertEquals(3 + 3, appendedSegments.size());
+
+ // Verify that the original append segments have been committed
+ Assert.assertTrue(appendedSegments.remove(segmentV01));
+ Assert.assertTrue(appendedSegments.remove(segmentV12));
+ Assert.assertTrue(appendedSegments.remove(segmentV23));
+
+ // Verify that segmentV01 has been upgraded to both v1 and v2
+ final DataSegment segmentV11 = findSegmentWith(v1,
segmentV01.getLoadSpec(), appendedSegments);
+ Assert.assertNotNull(segmentV11);
+ final DataSegment segmentV21 = findSegmentWith(v2,
segmentV01.getLoadSpec(), appendedSegments);
+ Assert.assertNotNull(segmentV21);
+
+ // Verify that segmentV12 has been upgraded to v2
+ final DataSegment segmentV22 = findSegmentWith(v2,
segmentV12.getLoadSpec(), appendedSegments);
+ Assert.assertNotNull(segmentV22);
+
+ // Verify that segmentV23 is not downgraded to v1
+ final DataSegment segmentV13 = findSegmentWith(v1,
segmentV23.getLoadSpec(), appendedSegments);
+ Assert.assertNull(segmentV13);
+
+ verifyIntervalHasUsedSegments(
+ YEAR_23,
+ segmentV01,
+ segmentV10, segmentV11, segmentV12,
+ segmentV20, segmentV21, segmentV22, segmentV23
+ );
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21,
segmentV22, segmentV23);
+ }
+
+ @Nullable
+ private DataSegment findSegmentWith(String version, Map<String, Object>
loadSpec, Set<DataSegment> segments)
+ {
+ for (DataSegment segment : segments) {
+ if (version.equals(segment.getVersion())
+ && Objects.equals(segment.getLoadSpec(), loadSpec)) {
+ return segment;
+ }
+ }
+
+ return null;
+ }
+
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 4c761a6f71b..01dfc0f5d67 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
@@ -43,6 +44,7 @@ import
org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -55,6 +57,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
@@ -1248,6 +1251,80 @@ public class TaskLockboxTest
);
}
+ @Test
+ public void testGetLockedIntervalsForHigherPriorityExclusiveLock()
+ {
+ final Task task = NoopTask.ofPriority(50);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ tryTimeChunkLock(
+ TaskLockType.APPEND,
+ task,
+ Intervals.of("2017/2018")
+ );
+
+ LockFilterPolicy requestForExclusiveLowerPriorityLock = new
LockFilterPolicy(
+ task.getDataSource(),
+ 75,
+ null
+ );
+
+ Map<String, List<Interval>> conflictingIntervals =
+
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+ Assert.assertTrue(conflictingIntervals.isEmpty());
+ }
+
+ @Test
+ public void testGetLockedIntervalsForLowerPriorityExclusiveLock()
+ {
+ final Task task = NoopTask.ofPriority(50);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ tryTimeChunkLock(
+ TaskLockType.APPEND,
+ task,
+ Intervals.of("2017/2018")
+ );
+
+ LockFilterPolicy requestForExclusiveLowerPriorityLock = new
LockFilterPolicy(
+ task.getDataSource(),
+ 25,
+ null
+ );
+
+ Map<String, List<Interval>> conflictingIntervals =
+
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+ Assert.assertEquals(1, conflictingIntervals.size());
+ Assert.assertEquals(
+ Collections.singletonList(Intervals.of("2017/2018")),
+ conflictingIntervals.get(task.getDataSource())
+ );
+ }
+
+ @Test
+ public void testGetLockedIntervalsForLowerPriorityReplaceLock()
+ {
+ final Task task = NoopTask.ofPriority(50);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ tryTimeChunkLock(
+ TaskLockType.APPEND,
+ task,
+ Intervals.of("2017/2018")
+ );
+
+ LockFilterPolicy requestForExclusiveLowerPriorityLock = new
LockFilterPolicy(
+ task.getDataSource(),
+ 25,
+ ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
+ );
+
+ Map<String, List<Interval>> conflictingIntervals =
+
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+ Assert.assertTrue(conflictingIntervals.isEmpty());
+ }
+
+
@Test
public void testExclusiveLockCompatibility()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
index aae07194bb9..fbe63ffe268 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
@@ -29,9 +29,8 @@ import org.apache.druid.segment.SegmentUtils;
import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Rule;
+import org.junit.Assert;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -43,9 +42,6 @@ import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
public class SequenceMetadataTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
@Mock
private SeekableStreamIndexTaskRunner mockSeekableStreamIndexTaskRunner;
@@ -59,7 +55,7 @@ public class SequenceMetadataTest
private TaskToolbox mockTaskToolbox;
@Test
- public void
testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty()
throws Exception
+ public void
testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty()
{
DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
@@ -76,16 +72,21 @@ public class SequenceMetadataTest
ImmutableMap.of(),
ImmutableMap.of(),
true,
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
- TransactionalSegmentPublisher transactionalSegmentPublisher =
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner,
mockTaskToolbox, true);
+ TransactionalSegmentPublisher transactionalSegmentPublisher
+ = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner,
mockTaskToolbox, true);
- expectedException.expect(ISE.class);
- expectedException.expectMessage(
- "Stream ingestion task unexpectedly attempted to overwrite segments: "
+ SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
+ ISE exception = Assert.assertThrows(
+ ISE.class,
+ () ->
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment,
ImmutableSet.of(), null)
+ );
+ Assert.assertEquals(
+ "Stream ingestion task unexpectedly attempted to overwrite segments: "
+ + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment),
+ exception.getMessage()
);
-
-
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment,
ImmutableSet.of(), null);
}
@Test
@@ -109,7 +110,8 @@ public class SequenceMetadataTest
ImmutableMap.of(),
ImmutableMap.of(),
true,
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
TransactionalSegmentPublisher transactionalSegmentPublisher =
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner,
mockTaskToolbox, false);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 34d2e44552a..d1c72485011 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -113,7 +113,11 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
@Override
- public List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval, @Nullable Integer limit)
+ public List<DataSegment> retrieveUnusedSegmentsForInterval(
+ String dataSource,
+ Interval interval,
+ @Nullable Integer limit
+ )
{
synchronized (unusedSegments) {
Stream<DataSegment> resultStream = unusedSegments.stream();
@@ -175,6 +179,17 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
return SegmentPublishResult.ok(commitSegments(appendSegments));
}
+ @Override
+ public SegmentPublishResult commitAppendSegmentsAndMetadata(
+ Set<DataSegment> appendSegments,
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ return SegmentPublishResult.ok(commitSegments(appendSegments));
+ }
+
@Override
public SegmentPublishResult commitSegmentsAndMetadata(
Set<DataSegment> segments,
@@ -222,6 +237,12 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
);
}
+ @Override
+ public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
+ {
+ return Collections.emptyMap();
+ }
+
@Override
public int deletePendingSegmentsCreatedInInterval(String dataSource,
Interval deleteInterval)
{
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
index bc3769b6236..7e249f72d0a 100644
---
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.emitter.service;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
/**
@@ -62,6 +63,18 @@ public class SegmentMetadataEvent implements Event
*/
private final boolean isCompacted;
+ public static SegmentMetadataEvent create(DataSegment segment, DateTime
eventTime)
+ {
+ return new SegmentMetadataEvent(
+ segment.getDataSource(),
+ eventTime,
+ segment.getInterval().getStart(),
+ segment.getInterval().getEnd(),
+ segment.getVersion(),
+ segment.getLastCompactionState() != null
+ );
+ }
+
public SegmentMetadataEvent(
String dataSource,
DateTime createdTime,
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
index 83a4fcba7dc..a926b004c0a 100644
---
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -21,6 +21,11 @@ package org.apache.druid.java.util.emitter.service;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@@ -51,4 +56,30 @@ public class SegmentMetadataEventTest
event.toMap()
);
}
+
+ @Test
+ public void testCreate()
+ {
+ final DataSegment segment = DataSegment.builder()
+ .dataSource("wiki")
+ .interval(Intervals.of("2023/2024"))
+ .shardSpec(new NumberedShardSpec(1,
1))
+ .version("v1")
+ .size(100)
+ .build();
+ final DateTime eventTime = DateTimes.nowUtc();
+ SegmentMetadataEvent event = SegmentMetadataEvent.create(segment,
eventTime);
+ Assert.assertEquals(
+ EventMap.builder()
+ .put(SegmentMetadataEvent.FEED, "segment_metadata")
+ .put(SegmentMetadataEvent.DATASOURCE, segment.getDataSource())
+ .put(SegmentMetadataEvent.CREATED_TIME, eventTime)
+ .put(SegmentMetadataEvent.START_TIME,
segment.getInterval().getStart())
+ .put(SegmentMetadataEvent.END_TIME,
segment.getInterval().getEnd())
+ .put(SegmentMetadataEvent.VERSION, segment.getVersion())
+ .put(SegmentMetadataEvent.IS_COMPACTED, false)
+ .build(),
+ event.toMap()
+ );
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 3cbabea78fa..2c2a6bc0f77 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -300,6 +300,21 @@ public interface IndexerMetadataStorageCoordinator
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
);
+ /**
+ * Commits segments created by an APPEND task. This method also handles
segment
+ * upgrade scenarios that may result from concurrent append and replace. Also
+ * commits start and end {@link DataSourceMetadata}.
+ *
+ * @see #commitAppendSegments
+ * @see #commitSegmentsAndMetadata
+ */
+ SegmentPublishResult commitAppendSegmentsAndMetadata(
+ Set<DataSegment> appendSegments,
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ );
+
/**
* Commits segments created by a REPLACE task. This method also handles the
* segment upgrade scenarios that may result from concurrent append and
replace.
@@ -319,6 +334,23 @@ public interface IndexerMetadataStorageCoordinator
Set<ReplaceTaskLock> locksHeldByReplaceTask
);
+ /**
+ * Creates and inserts new IDs for the pending segments hat overlap with the
given
+ * replace segments being committed. The newly created pending segment IDs:
+ * <ul>
+ * <li>Have the same interval and version as that of an overlapping segment
+ * committed by the REPLACE task.</li>
+ * <li>Cannot be committed but are only used to serve realtime queries
against
+ * those versions.</li>
+ * </ul>
+ *
+ * @param replaceSegments Segments being committed by a REPLACE task
+ * @return Map from originally allocated pending segment to its new upgraded
ID.
+ */
+ Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(
+ Set<DataSegment> replaceSegments
+ );
+
/**
* Retrieves data source's metadata from the metadata store. Returns null if
there is no metadata.
*/
@@ -405,5 +437,4 @@ public interface IndexerMetadataStorageCoordinator
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id, boolean includeUnused);
-
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 7eaac692f7c..226663c3233 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -105,6 +105,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private static final Logger log = new
Logger(IndexerSQLMetadataStorageCoordinator.class);
private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100;
+ private static final String UPGRADED_PENDING_SEGMENT_PREFIX =
"upgraded_to_version__";
+
private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig dbTables;
private final SQLMetadataConnector connector;
@@ -237,44 +239,45 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store.
+ * search interval from the metadata store. Returns a Map from the
+ * pending segment ID to the sequence name.
*/
- private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
+ private Map<SegmentIdWithShardSpec, String>
getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{
- final Set<SegmentIdWithShardSpec> identifiers = new HashSet<>();
-
- final ResultIterator<byte[]> dbSegments =
+ final ResultIterator<PendingSegmentsRecord> dbSegments =
handle.createQuery(
StringUtils.format(
// This query might fail if the year has a different number of
digits
// See https://github.com/apache/druid/pull/11582 for a
similar issue
// Using long for these timestamps instead of varchar would
give correct time comparisons
- "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND
start < :end and %2$send%2$s > :start",
+ "SELECT sequence_name, payload FROM %1$s"
+ + " WHERE dataSource = :dataSource AND start < :end and
%2$send%2$s > :start",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
- .map(ByteArrayMapper.FIRST)
+ .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
.iterator();
+ final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName =
new HashMap<>();
while (dbSegments.hasNext()) {
- final byte[] payload = dbSegments.next();
- final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload,
SegmentIdWithShardSpec.class);
+ PendingSegmentsRecord record = dbSegments.next();
+ final SegmentIdWithShardSpec identifier =
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
if (interval.overlaps(identifier.getInterval())) {
- identifiers.add(identifier);
+ pendingSegmentToSequenceName.put(identifier, record.sequenceName);
}
}
dbSegments.close();
- return identifiers;
+ return pendingSegmentToSequenceName;
}
private SegmentTimeline getTimelineForIntervalsWithHandle(
@@ -417,7 +420,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
(handle, transactionStatus) -> {
final Set<DataSegment> segmentsToInsert = new
HashSet<>(replaceSegments);
segmentsToInsert.addAll(
- getSegmentsToUpgradeOnReplace(handle, replaceSegments,
locksHeldByReplaceTask)
+ createNewIdsOfAppendSegmentsAfterReplace(handle,
replaceSegments, locksHeldByReplaceTask)
);
return SegmentPublishResult.ok(
insertSegments(handle, segmentsToInsert)
@@ -438,33 +441,28 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
)
{
- verifySegmentsToCommit(appendSegments);
-
- final String dataSource = appendSegments.iterator().next().getDataSource();
- final Set<DataSegment> upgradedSegments = connector.retryTransaction(
- (handle, transactionStatus)
- -> getSegmentsToUpgradeOnAppend(handle, dataSource,
appendSegments),
- 0,
- SQLMetadataConnector.DEFAULT_MAX_TRIES
+ return commitAppendSegmentsAndMetadataInTransaction(
+ appendSegments,
+ appendSegmentToReplaceLock,
+ null,
+ null
);
+ }
- // Create entries for all required versions of the append segments
- final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
- allSegmentsToInsert.addAll(upgradedSegments);
-
- try {
- return connector.retryTransaction(
- (handle, transactionStatus) -> {
- insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
- return SegmentPublishResult.ok(insertSegments(handle,
allSegmentsToInsert));
- },
- 3,
- getSqlMetadataMaxRetry()
- );
- }
- catch (CallbackFailedException e) {
- return SegmentPublishResult.fail(e.getMessage());
- }
+ @Override
+ public SegmentPublishResult commitAppendSegmentsAndMetadata(
+ Set<DataSegment> appendSegments,
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ return commitAppendSegmentsAndMetadataInTransaction(
+ appendSegments,
+ appendSegmentToReplaceLock,
+ startMetadata,
+ endMetadata
+ );
}
@Override
@@ -601,6 +599,125 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
+ @Override
+ public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(
+ Set<DataSegment> replaceSegments
+ )
+ {
+ if (replaceSegments.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Any replace interval has exactly one version of segments
+ final Map<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<>();
+ for (DataSegment segment : replaceSegments) {
+ DataSegment committedMaxId =
replaceIntervalToMaxId.get(segment.getInterval());
+ if (committedMaxId == null
+ || committedMaxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
+ replaceIntervalToMaxId.put(segment.getInterval(), segment);
+ }
+ }
+
+ final String datasource =
replaceSegments.iterator().next().getDataSource();
+ return connector.retryWithHandle(
+ handle -> upgradePendingSegments(handle, datasource,
replaceIntervalToMaxId)
+ );
+ }
+
+ /**
+ * Creates and inserts new IDs for the pending segments contained in each
replace
+ * interval. The newly created pending segment IDs
+ * <ul>
+ * <li>Have the same interval and version as that of an overlapping segment
+ * committed by the REPLACE task.</li>
+ * <li>Cannot be committed but are only used to serve realtime queries
against
+ * those versions.</li>
+ * </ul>
+ *
+ * @return Map from original pending segment to the new upgraded ID.
+ */
+ private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegments(
+ Handle handle,
+ String datasource,
+ Map<Interval, DataSegment> replaceIntervalToMaxId
+ ) throws IOException
+ {
+ final Map<SegmentCreateRequest, SegmentIdWithShardSpec>
newPendingSegmentVersions = new HashMap<>();
+ final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
pendingSegmentToNewId = new HashMap<>();
+
+ for (Map.Entry<Interval, DataSegment> entry :
replaceIntervalToMaxId.entrySet()) {
+ final Interval replaceInterval = entry.getKey();
+ final DataSegment maxSegmentId = entry.getValue();
+ final String replaceVersion = maxSegmentId.getVersion();
+
+ final int numCorePartitions =
maxSegmentId.getShardSpec().getNumCorePartitions();
+ int currentPartitionNumber =
maxSegmentId.getShardSpec().getPartitionNum();
+
+ final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
+ = getPendingSegmentsForIntervalWithHandle(handle, datasource,
replaceInterval);
+
+ for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
+ : overlappingPendingSegments.entrySet()) {
+ final SegmentIdWithShardSpec pendingSegmentId =
overlappingPendingSegment.getKey();
+ final String pendingSegmentSequence =
overlappingPendingSegment.getValue();
+ if (shouldUpgradePendingSegment(pendingSegmentId,
pendingSegmentSequence, replaceInterval, replaceVersion)) {
+ // Ensure unique sequence_name_prev_id_sha1 by setting
+ // sequence_prev_id -> pendingSegmentId
+ // sequence_name -> prefix + replaceVersion
+ SegmentIdWithShardSpec newId = new SegmentIdWithShardSpec(
+ datasource,
+ replaceInterval,
+ replaceVersion,
+ new NumberedShardSpec(++currentPartitionNumber,
numCorePartitions)
+ );
+ newPendingSegmentVersions.put(
+ new SegmentCreateRequest(
+ UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
+ pendingSegmentId.toString(),
+ replaceVersion,
+ NumberedPartialShardSpec.instance()
+ ),
+ newId
+ );
+ pendingSegmentToNewId.put(pendingSegmentId, newId);
+ }
+ }
+ }
+
+ // Do not skip lineage check so that the sequence_name_prev_id_sha1
+ // includes hash of both sequence_name and prev_segment_id
+ int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore(
+ handle,
+ newPendingSegmentVersions,
+ datasource,
+ false
+ );
+ log.info(
+ "Inserted total [%d] new versions for [%d] pending segments.",
+ numInsertedPendingSegments, newPendingSegmentVersions.size()
+ );
+
+ return pendingSegmentToNewId;
+ }
+
+ private boolean shouldUpgradePendingSegment(
+ SegmentIdWithShardSpec pendingSegmentId,
+ String pendingSegmentSequenceName,
+ Interval replaceInterval,
+ String replaceVersion
+ )
+ {
+ if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
+ return false;
+ } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) {
+ return false;
+ } else {
+ // Do not upgrade already upgraded pending segment
+ return pendingSegmentSequenceName == null
+ ||
!pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
+ }
+ }
+
@Nullable
private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(
final Handle handle,
@@ -721,7 +838,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
handle,
createdSegments,
dataSource,
- interval,
skipSegmentLineageCheck
);
@@ -971,11 +1087,74 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- private void insertPendingSegmentsIntoMetastore(
+ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
+ Set<DataSegment> appendSegments,
+ Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+ @Nullable DataSourceMetadata startMetadata,
+ @Nullable DataSourceMetadata endMetadata
+ )
+ {
+ verifySegmentsToCommit(appendSegments);
+ if ((startMetadata == null && endMetadata != null)
+ || (startMetadata != null && endMetadata == null)) {
+ throw new IllegalArgumentException("start/end metadata pair must be
either null or non-null");
+ }
+
+ final String dataSource = appendSegments.iterator().next().getDataSource();
+ final Set<DataSegment> segmentIdsForNewVersions =
connector.retryTransaction(
+ (handle, transactionStatus)
+ -> createNewIdsForAppendSegments(handle, dataSource,
appendSegments),
+ 0,
+ SQLMetadataConnector.DEFAULT_MAX_TRIES
+ );
+
+ // Create entries for all required versions of the append segments
+ final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
+ allSegmentsToInsert.addAll(segmentIdsForNewVersions);
+
+ final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
+ try {
+ return connector.retryTransaction(
+ (handle, transactionStatus) -> {
+ metadataNotUpdated.set(false);
+
+ if (startMetadata != null) {
+ final DataStoreMetadataUpdateResult metadataUpdateResult
+ = updateDataSourceMetadataWithHandle(handle, dataSource,
startMetadata, endMetadata);
+
+ if (metadataUpdateResult.isFailed()) {
+ transactionStatus.setRollbackOnly();
+ metadataNotUpdated.set(true);
+
+ if (metadataUpdateResult.canRetry()) {
+ throw new
RetryTransactionException(metadataUpdateResult.getErrorMsg());
+ } else {
+ throw new
RuntimeException(metadataUpdateResult.getErrorMsg());
+ }
+ }
+ }
+
+ insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+ return SegmentPublishResult.ok(insertSegments(handle,
allSegmentsToInsert));
+ },
+ 3,
+ getSqlMetadataMaxRetry()
+ );
+ }
+ catch (CallbackFailedException e) {
+ if (metadataNotUpdated.get()) {
+ // Return failed result if metadata was definitely not updated
+ return SegmentPublishResult.fail(e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private int insertPendingSegmentsIntoMetastore(
Handle handle,
Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
String dataSource,
- Interval interval,
boolean skipSegmentLineageCheck
) throws JsonProcessingException
{
@@ -996,6 +1175,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry :
segmentIdToRequest.entrySet()) {
final SegmentCreateRequest request = entry.getValue();
final SegmentIdWithShardSpec segmentId = entry.getKey();
+ final Interval interval = segmentId.getInterval();
+
insertBatch.add()
.bind("id", segmentId.toString())
.bind("dataSource", dataSource)
@@ -1010,7 +1191,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
)
.bind("payload", jsonMapper.writeValueAsBytes(segmentId));
}
- insertBatch.execute();
+ int[] updated = insertBatch.execute();
+ return Arrays.stream(updated).sum();
}
private void insertPendingSegmentIntoMetastore(
@@ -1046,15 +1228,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
/**
- * Allocates and returns any extra versions that need to be committed for the
- * given append segments.
- * <p>
- * This is typically needed when a REPLACE task started and finished after
- * these append segments had already been allocated. As such,
- * there would be some used segments in the DB with versions higher than
these
- * append segments.
+ * Creates new IDs for the given append segments if a REPLACE task started
and
+ * finished after these append segments had already been allocated. The newly
+ * created IDs belong to the same interval and version as the segments
committed
+ * by the REPLACE task.
*/
- private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+ private Set<DataSegment> createNewIdsForAppendSegments(
Handle handle,
String dataSource,
Set<DataSegment> segmentsToAppend
@@ -1079,17 +1258,17 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Segments.INCLUDING_OVERSHADOWED
);
- final Map<String, Set<Interval>> committedVersionToIntervals = new
HashMap<>();
- final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new
HashMap<>();
+ final Map<String, Set<Interval>> overlappingVersionToIntervals = new
HashMap<>();
+ final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new
HashMap<>();
for (DataSegment segment : overlappingSegments) {
- committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
+ overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
.add(segment.getInterval());
- committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i ->
new HashSet<>())
+ overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i
-> new HashSet<>())
.add(segment);
}
final Set<DataSegment> upgradedSegments = new HashSet<>();
- for (Map.Entry<String, Set<Interval>> entry :
committedVersionToIntervals.entrySet()) {
+ for (Map.Entry<String, Set<Interval>> entry :
overlappingVersionToIntervals.entrySet()) {
final String upgradeVersion = entry.getKey();
Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
upgradeVersion,
@@ -1097,12 +1276,18 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
appendVersionToSegments
);
for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry :
segmentsToUpgrade.entrySet()) {
- Set<DataSegment> segmentsUpgradedToVersion = upgradeSegmentsToVersion(
+ final Interval upgradeInterval = upgradeEntry.getKey();
+ final Set<DataSegment> segmentsAlreadyOnVersion
+ = overlappingIntervalToSegments.getOrDefault(upgradeInterval,
Collections.emptySet())
+ .stream()
+ .filter(s ->
s.getVersion().equals(upgradeVersion))
+ .collect(Collectors.toSet());
+ Set<DataSegment> segmentsUpgradedToVersion =
createNewIdsForAppendSegmentsWithVersion(
handle,
upgradeVersion,
- upgradeEntry.getKey(),
+ upgradeInterval,
upgradeEntry.getValue(),
- committedIntervalToSegments
+ segmentsAlreadyOnVersion
);
log.info("Upgraded [%d] segments to version[%s].",
segmentsUpgradedToVersion.size(), upgradeVersion);
upgradedSegments.addAll(segmentsUpgradedToVersion);
@@ -1150,23 +1335,20 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
/**
- * Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded
- * to the given {@code upgradeVersion}.
+ * Computes new segment IDs that belong to the upgradeInterval and
upgradeVersion.
+ *
+ * @param committedSegments Segments that already exist in the
upgradeInterval
+ * at upgradeVersion.
*/
- private Set<DataSegment> upgradeSegmentsToVersion(
+ private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
Handle handle,
String upgradeVersion,
- Interval interval,
+ Interval upgradeInterval,
Set<DataSegment> segmentsToUpgrade,
- Map<Interval, Set<DataSegment>> committedSegmentsByInterval
+ Set<DataSegment> committedSegments
) throws IOException
{
- final Set<DataSegment> committedSegments
- = committedSegmentsByInterval.getOrDefault(interval,
Collections.emptySet())
- .stream()
- .filter(s ->
s.getVersion().equals(upgradeVersion))
- .collect(Collectors.toSet());
-
+ // Find the committed segments with the higest partition number
SegmentIdWithShardSpec committedMaxId = null;
for (DataSegment committedSegment : committedSegments) {
if (committedMaxId == null
@@ -1175,14 +1357,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- // Get pending segments for the new version, if any
+ // Get pending segments for the new version to determine the next
partition number to allocate
final String dataSource =
segmentsToUpgrade.iterator().next().getDataSource();
- final Set<SegmentIdWithShardSpec> pendingSegments
- = getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval);
+ final Set<SegmentIdWithShardSpec> pendingSegmentIds
+ = getPendingSegmentsForIntervalWithHandle(handle, dataSource,
upgradeInterval).keySet();
+ final Set<SegmentIdWithShardSpec> allAllocatedIds = new
HashSet<>(pendingSegmentIds);
- // Determine new IDs for each append segment by taking into account both
- // committed and pending segments for this version
- final Set<DataSegment> upgradedSegments = new HashSet<>();
+ // Create new IDs for each append segment
+ final Set<DataSegment> newSegmentIds = new HashSet<>();
for (DataSegment segment : segmentsToUpgrade) {
SegmentCreateRequest request = new SegmentCreateRequest(
segment.getId() + "__" + upgradeVersion,
@@ -1190,19 +1372,21 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
upgradeVersion,
NumberedPartialShardSpec.instance()
);
- // allocate new segment id
+
+ // Create new segment ID based on committed segments, allocated pending
segments
+ // and new IDs created so far in this method
final SegmentIdWithShardSpec newId = createNewSegment(
request,
dataSource,
- interval,
+ upgradeInterval,
upgradeVersion,
committedMaxId,
- pendingSegments
+ allAllocatedIds
);
- // Add to set of pending segments so that shard specs are computed
taking the new id into account
- pendingSegments.add(newId);
- upgradedSegments.add(
+ // Update the set so that subsequent segment IDs use a higher partition
number
+ allAllocatedIds.add(newId);
+ newSegmentIds.add(
DataSegment.builder(segment)
.interval(newId.getInterval())
.version(newId.getVersion())
@@ -1211,7 +1395,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
- return upgradedSegments;
+ return newSegmentIds;
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
@@ -1278,7 +1462,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendingSegments =
- getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+ new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle,
dataSource, interval).keySet());
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
new HashMap<>();
final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new
HashMap<>();
@@ -1328,23 +1512,24 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
final PartialShardSpec partialShardSpec = request.getPartialShardSpec();
final String existingVersion = request.getVersion();
+ final Set<SegmentIdWithShardSpec> mutablePendingSegments = new
HashSet<>(pendingSegments);
// Include the committedMaxId while computing the overallMaxId
if (committedMaxId != null) {
- pendingSegments.add(committedMaxId);
+ mutablePendingSegments.add(committedMaxId);
}
// If there is an existing chunk, find the max id with the same version as
the existing chunk.
// There may still be a pending segment with a higher version (but no
corresponding used segments)
// which may generate a clash with an existing segment once the new id is
generated
final SegmentIdWithShardSpec overallMaxId =
- pendingSegments.stream()
- .filter(id ->
id.getShardSpec().sharePartitionSpace(partialShardSpec))
- .filter(id -> versionOfExistingChunk == null
- ||
id.getVersion().equals(versionOfExistingChunk))
-
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
- .thenComparing(id ->
id.getShardSpec().getPartitionNum()))
- .orElse(null);
+ mutablePendingSegments.stream()
+ .filter(id ->
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+ .filter(id -> versionOfExistingChunk == null
+ ||
id.getVersion().equals(versionOfExistingChunk))
+
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
+ .thenComparing(id ->
id.getShardSpec().getPartitionNum()))
+ .orElse(null);
// Determine the version of the new segment
final String newSegmentVersion;
@@ -1484,10 +1669,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
- final Set<SegmentIdWithShardSpec> pendings =
getPendingSegmentsForIntervalWithHandle(
- handle,
- dataSource,
- interval
+ final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
+ getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval).keySet()
);
if (committedMaxId != null) {
pendings.add(committedMaxId);
@@ -1666,7 +1849,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return toInsertSegments;
}
- private Set<DataSegment> getSegmentsToUpgradeOnReplace(
+ /**
+ * Creates new versions of segments appended while a REPLACE task was in
progress.
+ */
+ private Set<DataSegment> createNewIdsOfAppendSegmentsAfterReplace(
final Handle handle,
final Set<DataSegment> replaceSegments,
final Set<ReplaceTaskLock> locksHeldByReplaceTask
diff --git
a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
new file mode 100644
index 00000000000..88ab4673aa8
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specifies a policy to filter active locks held by a datasource
+ */
+public class LockFilterPolicy
+{
+ private final String datasource;
+ private final int priority;
+ private final Map<String, Object> context;
+
+ @JsonCreator
+ public LockFilterPolicy(
+ @JsonProperty("datasource") String datasource,
+ @JsonProperty("priority") int priority,
+ @JsonProperty("context") Map<String, Object> context
+ )
+ {
+ this.datasource = datasource;
+ this.priority = priority;
+ this.context = context == null ? Collections.emptyMap() : context;
+ }
+
+ @JsonProperty
+ public String getDatasource()
+ {
+ return datasource;
+ }
+
+ @JsonProperty
+ public int getPriority()
+ {
+ return priority;
+ }
+
+ @JsonProperty
+ public Map<String, Object> getContext()
+ {
+ return context;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LockFilterPolicy that = (LockFilterPolicy) o;
+ return Objects.equals(datasource, that.datasource)
+ && priority == that.priority
+ && Objects.equals(context, that.context);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(datasource, priority, context);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index b4391dc0a8e..6a8e515b327 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.joda.time.Interval;
@@ -185,15 +186,15 @@ public interface OverlordClient
ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses();
/**
- * Returns a list of intervals locked by higher priority tasks for each
datasource.
+ * Returns a list of intervals locked by higher priority conflicting lock
types
*
- * @param minTaskPriority Minimum task priority for each datasource. Only
the intervals that are locked by tasks with
- * equal or higher priority than this are returned.
- *
- * @return Map from dtasource name to list of intervals locked by tasks that
have priority greater than or equal to
- * the {@code minTaskPriority} for that datasource.
+ * @param lockFilterPolicies List of all filters for different datasources
+ * @return Map from datasource name to list of intervals locked by tasks
that have a conflicting lock type with
+ * priority greater than or equal to the {@code minTaskPriority} for that
datasource.
*/
- ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(Map<String, Integer> minTaskPriority);
+ ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+ List<LockFilterPolicy> lockFilterPolicies
+ );
/**
* Deletes pending segment records from the metadata store for a particular
datasource. Records with
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 306c909e404..d7fab4b75fa 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -39,6 +39,7 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
@@ -188,14 +189,16 @@ public class OverlordClientImpl implements OverlordClient
}
@Override
- public ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(Map<String, Integer> minTaskPriority)
+ public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+ List<LockFilterPolicy> lockFilterPolicies
+ )
{
- final String path = "/druid/indexer/v1/lockedIntervals";
+ final String path = "/druid/indexer/v1/lockedIntervals/v2";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, path)
- .jsonContent(jsonMapper, minTaskPriority),
+ .jsonContent(jsonMapper, lockFilterPolicies),
new BytesFullResponseHandler()
),
holder -> {
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 46315dbc0de..81cd4db2556 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -70,6 +70,8 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -92,6 +94,8 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
+ private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor>
newIdToBasePendingSegment
+ = new ConcurrentHashMap<>();
public SinkQuerySegmentWalker(
String dataSource,
@@ -182,7 +186,8 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
- descriptor -> {
+ newDescriptor -> {
+ final SegmentDescriptor descriptor =
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
@@ -297,6 +302,17 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
);
}
+ public void registerNewVersionOfPendingSegment(
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newSegmentVersion
+ )
+ {
+ newIdToBasePendingSegment.put(
+ newSegmentVersion.asSegmentId().toDescriptor(),
+ basePendingSegment.asSegmentId().toDescriptor()
+ );
+ }
+
@VisibleForTesting
String getDataSource()
{
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index e95852bfddb..f21f67ed504 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -72,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
@@ -86,6 +87,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -153,6 +155,9 @@ public class StreamAppenderator implements Appenderator
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
+ baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
// use intermediate executor so that deadlock conditions can be prevented
@@ -998,7 +1003,7 @@ public class StreamAppenderator implements Appenderator
log.debug("Shutting down immediately...");
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
try {
- segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
+ unannounceAllVersionsOfSegment(entry.getValue().getSegment());
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]",
schema.getDataSource())
@@ -1026,6 +1031,66 @@ public class StreamAppenderator implements Appenderator
}
}
+ /**
+ * Unannounces the given base segment and all its upgraded versions.
+ */
+ private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws
IOException
+ {
+ segmentAnnouncer.unannounceSegment(baseSegment);
+
+ final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
+ = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
+ if (upgradedVersionsOfSegment == null ||
upgradedVersionsOfSegment.isEmpty()) {
+ return;
+ }
+
+ for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
+ final DataSegment newSegment = new DataSegment(
+ newId.getDataSource(),
+ newId.getInterval(),
+ newId.getVersion(),
+ baseSegment.getLoadSpec(),
+ baseSegment.getDimensions(),
+ baseSegment.getMetrics(),
+ newId.getShardSpec(),
+ baseSegment.getBinaryVersion(),
+ baseSegment.getSize()
+ );
+ segmentAnnouncer.unannounceSegment(newSegment);
+ }
+ }
+
+ public void registerNewVersionOfPendingSegment(
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newSegmentVersion
+ ) throws IOException
+ {
+ if (!sinks.containsKey(basePendingSegment) ||
droppingSinks.contains(basePendingSegment)) {
+ return;
+ }
+
+ // Update query mapping with SinkQuerySegmentWalker
+ ((SinkQuerySegmentWalker)
texasRanger).registerNewVersionOfPendingSegment(basePendingSegment,
newSegmentVersion);
+
+ // Announce segments
+ final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
+
+ final DataSegment newSegment = new DataSegment(
+ newSegmentVersion.getDataSource(),
+ newSegmentVersion.getInterval(),
+ newSegmentVersion.getVersion(),
+ baseSegment.getLoadSpec(),
+ baseSegment.getDimensions(),
+ baseSegment.getMetrics(),
+ newSegmentVersion.getShardSpec(),
+ baseSegment.getBinaryVersion(),
+ baseSegment.getSize()
+ );
+ segmentAnnouncer.announceSegment(newSegment);
+
baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(),
id -> new HashSet<>())
+ .add(newSegmentVersion);
+ }
+
private void lockBasePersistDirectory()
{
if (basePersistDirLock == null) {
@@ -1327,7 +1392,7 @@ public class StreamAppenderator implements Appenderator
// Unannounce the segment.
try {
- segmentAnnouncer.unannounceSegment(sink.getSegment());
+ unannounceAllVersionsOfSegment(sink.getSegment());
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]",
schema.getDataSource())
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 0c08da7c8dd..ab92f180149 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -175,7 +176,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
// Skip all the intervals locked by higher priority tasks for each
datasource
// This must be done after the invalid compaction tasks are cancelled
// in the loop above so that their intervals are not considered locked
- getLockedIntervalsToSkip(compactionConfigList).forEach(
+ getLockedIntervals(compactionConfigList).forEach(
(dataSource, intervals) ->
intervalsToSkipCompaction
.computeIfAbsent(dataSource, ds -> new ArrayList<>())
@@ -247,6 +248,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
/**
* Gets a List of Intervals locked by higher priority tasks for each
datasource.
+ * However, when using a REPLACE lock for compaction, intervals locked with
any APPEND lock will not be returned
* Since compaction tasks submitted for these Intervals would have to wait
anyway,
* we skip these Intervals until the next compaction run.
* <p>
@@ -254,25 +256,21 @@ public class CompactSegments implements
CoordinatorCustomDuty
* though they lock only a Segment and not the entire Interval. Thus,
* a compaction task will not be submitted for an Interval if
* <ul>
- * <li>either the whole Interval is locked by a higher priority Task</li>
+ * <li>either the whole Interval is locked by a higher priority Task with
an incompatible lock type</li>
* <li>or there is atleast one Segment in the Interval that is locked by a
* higher priority Task</li>
* </ul>
*/
- private Map<String, List<Interval>> getLockedIntervalsToSkip(
+ private Map<String, List<Interval>> getLockedIntervals(
List<DataSourceCompactionConfig> compactionConfigs
)
{
- final Map<String, Integer> minTaskPriority = compactionConfigs
+ final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
.stream()
- .collect(
- Collectors.toMap(
- DataSourceCompactionConfig::getDataSource,
- DataSourceCompactionConfig::getTaskPriority
- )
- );
+ .map(config -> new LockFilterPolicy(config.getDataSource(),
config.getTaskPriority(), config.getTaskContext()))
+ .collect(Collectors.toList());
final Map<String, List<Interval>> datasourceToLockedIntervals =
- new
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(minTaskPriority),
true));
+ new
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
true));
LOG.debug(
"Skipping the following intervals for Compaction as they are currently
locked: %s",
datasourceToLockedIntervals
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 1e146736da1..301d9631b7d 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -61,6 +61,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -870,29 +871,37 @@ public class DataSourcesResource
final Interval theInterval = Intervals.of(interval);
final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval,
version, partitionNumber);
final DateTime now = DateTimes.nowUtc();
- // dropped means a segment will never be handed off, i.e it completed
hand off
- // init to true, reset to false only if this segment can be loaded by
rules
- boolean dropped = true;
+
+ // A segment that is not eligible for load will never be handed off
+ boolean notEligibleForLoad = true;
for (Rule rule : rules) {
if (rule.appliesTo(theInterval, now)) {
if (rule instanceof LoadRule) {
- dropped = false;
+ notEligibleForLoad = false;
}
break;
}
}
- if (dropped) {
+ if (notEligibleForLoad) {
return Response.ok(true).build();
}
- TimelineLookup<String, SegmentLoadInfo> timeline =
serverInventoryView.getTimeline(
+ VersionedIntervalTimeline<String, SegmentLoadInfo> timeline =
serverInventoryView.getTimeline(
new TableDataSource(dataSourceName)
);
if (timeline == null) {
- log.debug("No timeline found for datasource[%s]", dataSourceName);
+ log.error("No timeline found for datasource[%s]", dataSourceName);
return Response.ok(false).build();
}
+ // A segment with version lower than that of the latest chunk might
never get handed off
+ // If there are multiple versions of this segment (due to a concurrent
replace task),
+ // only the latest version would get handed off
+ List<TimelineObjectHolder<String, SegmentLoadInfo>> timelineObjects =
timeline.lookup(Intervals.of(interval));
+ if (!timelineObjects.isEmpty() &&
timelineObjects.get(0).getVersion().compareTo(version) > 0) {
+ return Response.ok(true).build();
+ }
+
Iterable<ImmutableSegmentLoadInfo> servedSegmentsInInterval =
prepareServedSegmentsInInterval(timeline, theInterval);
if (isSegmentLoaded(servedSegmentsInInterval, descriptor)) {
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
index 6882c6b762d..42ca59ffee7 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
@@ -24,6 +24,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.joda.time.Interval;
@@ -95,7 +96,9 @@ public class NoopOverlordClient implements OverlordClient
}
@Override
- public ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(Map<String, Integer> minTaskPriority)
+ public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+ List<LockFilterPolicy> lockFilterPolicies
+ )
{
throw new UnsupportedOperationException();
}
diff --git
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 5b9a88d5841..7ba5916a771 100644
---
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
@@ -219,13 +220,15 @@ public class OverlordClientImplTest
@Test
public void test_findLockedIntervals() throws Exception
{
- final Map<String, Integer> priorityMap = ImmutableMap.of("foo", 3);
final Map<String, List<Interval>> lockMap =
ImmutableMap.of("foo",
Collections.singletonList(Intervals.of("2000/2001")));
+ final List<LockFilterPolicy> requests = ImmutableList.of(
+ new LockFilterPolicy("foo", 3, null)
+ );
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.POST,
"/druid/indexer/v1/lockedIntervals")
- .jsonContent(jsonMapper, priorityMap),
+ new RequestBuilder(HttpMethod.POST,
"/druid/indexer/v1/lockedIntervals/v2")
+ .jsonContent(jsonMapper, requests),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(lockMap)
@@ -233,18 +236,20 @@ public class OverlordClientImplTest
Assert.assertEquals(
lockMap,
- overlordClient.findLockedIntervals(priorityMap).get()
+ overlordClient.findLockedIntervals(requests).get()
);
}
@Test
public void test_findLockedIntervals_nullReturn() throws Exception
{
- final Map<String, Integer> priorityMap = ImmutableMap.of("foo", 3);
+ final List<LockFilterPolicy> requests = ImmutableList.of(
+ new LockFilterPolicy("foo", 3, null)
+ );
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.POST,
"/druid/indexer/v1/lockedIntervals")
- .jsonContent(jsonMapper, priorityMap),
+ new RequestBuilder(HttpMethod.POST,
"/druid/indexer/v1/lockedIntervals/v2")
+ .jsonContent(jsonMapper, requests),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(null)
@@ -252,7 +257,7 @@ public class OverlordClientImplTest
Assert.assertEquals(
Collections.emptyMap(),
- overlordClient.findLockedIntervals(priorityMap).get()
+ overlordClient.findLockedIntervals(requests).get()
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2926dbd6d70..43b1c50c969 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -58,6 +58,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
@@ -1994,8 +1995,11 @@ public class CompactSegmentsTest
return Futures.immediateFuture(null);
}
+
@Override
- public ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(Map<String, Integer> minTaskPriority)
+ public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+ List<LockFilterPolicy> lockFilterPolicies
+ )
{
return Futures.immediateFuture(lockedIntervals);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]