imply-cheddar commented on code in PR #14407: URL: https://github.com/apache/druid/pull/14407#discussion_r1231775301
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. Review Comment: > The segment versions must all be less than or equal to a lock held by your task for the segment intervals. Is this true? We would expect replaces to cause the append tasks to generate segments with versions higher than their lock too, right? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; Review Comment: As far as I can tell, this field only exists for the case that we are committing metadata without committing segments. That should be its own action anyway rather than overloaded in this one. I'm assuming that you just C&P'd this from the other actions that are being used, whenever you C&P code, make sure that you are go back over the code to validate that it's doing the minimal set of things required for the task. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } Review Comment: stream-of-consciousness: I wonder, are we guaranteed that we will never have 2 locks open for the same interval? I think we are... ########## server/src/main/java/org/apache/druid/indexing/overlord/TaskLockInfo.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.joda.time.Interval; + +import java.util.Objects; + +public class TaskLockInfo Review Comment: Why do we need this class? It appears to just be yet-another holder for things already in `TaskLock`, why not just use `TaskLock`? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java: ########## Review Comment: It looks like you haven't added any tests for the behavior added/adjusted here. Please make sure that you do that. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java: ########## @@ -724,9 +725,12 @@ public static NonnullPair<Interval, String> findIntervalAndVersion( } } // We don't have a lock for this interval, so we should lock it now. + if (taskLockType == null) { + taskLockType = TaskLockType.EXCLUSIVE; + } Review Comment: Which call-site of this method passes in null? I betcha it's better to change the call-sites than to change this. If you don't want to change the callsites for some reason, then overlod the method (keep the original 4-arg method and add the 5 arg method, then make the 4-arg method call the 5-arg method). ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A test Task which mimics an appending task by having similar interactions with the overlord. Review Comment: If this is a "test task" why is it not in the test sources? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A test Task which mimics an appending task by having similar interactions with the overlord. + * + * Begins running by acquiring an APPEND lock and immediately allocates pending segments. + * + * Task ends after publishing these pending segments and relevant metadata entries in a transaction + * Replace lock exists with version V3 + * V0 -> PS0 -> append task begins + * V1 -> (S1-0) -> replace task begins and published and completed + * V2 -> (S2-0) -> replace task begins and published and completed + * V3 -> Replace acquired lock + * + * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has completed + * + * + * (S3-0, 1) today + * (S3-0, 2), (S3-1, 2) needs to happen + * V3 replace task finishes -> (S3-0, S0-0 == S3-1) + * + * segment metadata -> Publish all pending segments and also create copies for greater versions for which used segments exist + * forward metadata -> Publish mapping of the original segments to the EXCLUSIVE lock held for the same interval when present + */ +public class AppendTask extends AbstractTask +{ + private final Interval interval; + private final Granularity segmentGranularity; + private final String lockType; + private final int priority; + private final int numPartitions; + private final CountDownLatch readyLatch = new CountDownLatch(1); + private final CountDownLatch runLatch = new CountDownLatch(1); + private final CountDownLatch segmentAllocationComplete = new CountDownLatch(1); + private final CountDownLatch runComplete = new CountDownLatch(1); + private final CountDownLatch readyComplete = new CountDownLatch(1); + + private final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(); + + private TaskToolbox toolbox; + private final AtomicInteger sequenceId = new AtomicInteger(0); + + public AppendTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("segmentGranularity") Granularity segmentGranularity, + @JsonProperty("context") Map<String, Object> context + ) + { + super( + id == null ? StringUtils.format("replace_%s_%s", DateTimes.nowUtc(), UUID.randomUUID().toString()) : id, + dataSource == null ? "none" : dataSource, + context, + IngestionMode.APPEND + ); + this.interval = interval; + this.segmentGranularity = segmentGranularity; + this.lockType = getContextValue(Tasks.TASK_LOCK_TYPE, "EXCLUSIVE"); + this.priority = getContextValue(Tasks.PRIORITY_KEY, 0); + this.numPartitions = getContextValue("numPartitions", 0); Review Comment: Why pass these through the context? You can just set them... ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; Review Comment: Can we move this to a specific action that only commits metadata instead of overloading this action? On top of that, I don't know why this case is specific or should exist as part of the append segment action? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( Review Comment: I find myself wishing that this method was `commitSegments()` instead of `announceHistoricalSegments()`, 'cause "announce" makes it sounds like announcing to the cluster... ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), Review Comment: segments are likely to have the same interval repeated multiple times right? might as well make it a set. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + null, + startMetadata, + endMetadata, + appendSegmentLockMap, + taskLockInfos, + true + ) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) Review Comment: I wish we could get a better error message than this... Ah well ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult> +{ + /** + * Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalReplaceAction#segments} + */ + @Nullable + private final Set<DataSegment> segmentsToBeOverwritten; + /** + * Set of segments to be inserted into metadata storage + */ + private final Set<DataSegment> segments; + /** + * Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalReplaceAction#segments}, + * are inserted into metadata storage. + */ + @Nullable + private final Set<DataSegment> segmentsToBeDropped; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalReplaceAction overwriteAction( + @Nullable Set<DataSegment> segmentsToBeOverwritten, + @Nullable Set<DataSegment> segmentsToBeDropped, + Set<DataSegment> segmentsToPublish + ) + { + return new SegmentTransactionalReplaceAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null); + } + + @JsonCreator + private SegmentTransactionalReplaceAction( + @JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> segmentsToBeOverwritten, + @JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> segmentsToBeDropped, + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segmentsToBeOverwritten = segmentsToBeOverwritten; + this.segmentsToBeDropped = segmentsToBeDropped; + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + @Nullable + public Set<DataSegment> getSegmentsToBeOverwritten() + { + return segmentsToBeOverwritten; + } + + @JsonProperty + @Nullable + public Set<DataSegment> getSegmentsToBeDropped() + { + return segmentsToBeDropped; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { Review Comment: Why is the replace action finding replace locks? Shouldn't it be finding append locks and figuring out if there are any that overlap? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); Review Comment: This Set appears to be getting made for the sole purpose of being iterated across in the critical section. I'm not sure why building yet-another set from the set that we already have is really helpful here? I'm guessing that this is the result of code iterations, but please be very conscious of objects. No object should be created without a purpose. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + null, + startMetadata, + endMetadata, + appendSegmentLockMap, + taskLockInfos, + true + ) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + + if (retVal.isSuccess()) { + toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1)); + } else { + toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1)); + } + + // getSegments() should return an empty set if announceHistoricalSegments() failed + for (DataSegment segment : retVal.getSegments()) { + metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); + metricBuilder.setDimension( + DruidMetrics.PARTITIONING_TYPE, + segment.getShardSpec() == null ? null : segment.getShardSpec().getType() + ); + toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize())); + } + + return retVal; + } + + + private TaskLockInfo getTaskLockInfo(TaskLock taskLock) + { + return new TaskLockInfo(taskLock.getInterval(), taskLock.getVersion()); + } Review Comment: the verb `get` makes it sound like youa re trying to "get" something. This is creating a new object, not getting anything. In the future, if you have a need for somethign like that (converting from one type to another), create a static creator method on the object that you are building. I.e. in this case, it would be `TaskLockInfo.fromTaskLock(TaskLock lock)` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult> Review Comment: There's a bunch of stuff that I see and commented on in `SegmentTransactionalAppendAction` already that seems to exist here too. I'm guessing it's copy pasta. Please clean this class up similar to how you clean up the other one. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult> +{ + /** + * Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalReplaceAction#segments} + */ + @Nullable + private final Set<DataSegment> segmentsToBeOverwritten; + /** + * Set of segments to be inserted into metadata storage + */ + private final Set<DataSegment> segments; + /** + * Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalReplaceAction#segments}, + * are inserted into metadata storage. + */ + @Nullable + private final Set<DataSegment> segmentsToBeDropped; Review Comment: Do we need this? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + null, + startMetadata, + endMetadata, + appendSegmentLockMap, + taskLockInfos, + true + ) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + + if (retVal.isSuccess()) { + toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1)); + } else { + toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1)); + } + + // getSegments() should return an empty set if announceHistoricalSegments() failed Review Comment: If I understand this corectly, this comment is saying that the `for` loop is a noop if the critical section failed. I.e. this loop will only do something if `retVal.isSuccess() == true`. If that's the case, then let's just move the loop inside that part of the if/else above. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java: ########## @@ -430,7 +429,8 @@ protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> inter } prev = cur; - final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur)); + final TaskLockType taskLockType = TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name())); Review Comment: The way you are calling this, when it goes through the default path, you are taking the Enum value `EXCLUSIVE`, turning it into a string, then running that string back through `valueOf` just so that it can return back `EXCLUSIVE`. Would be a lot better to just read the value from the context and then return `EXCLUSIVE` if the value is null. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A test Task which mimics an appending task by having similar interactions with the overlord. + * + * Begins running by acquiring an APPEND lock and immediately allocates pending segments. + * + * Task ends after publishing these pending segments and relevant metadata entries in a transaction + * Replace lock exists with version V3 + * V0 -> PS0 -> append task begins + * V1 -> (S1-0) -> replace task begins and published and completed + * V2 -> (S2-0) -> replace task begins and published and completed + * V3 -> Replace acquired lock + * + * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has completed + * + * + * (S3-0, 1) today + * (S3-0, 2), (S3-1, 2) needs to happen + * V3 replace task finishes -> (S3-0, S0-0 == S3-1) + * + * segment metadata -> Publish all pending segments and also create copies for greater versions for which used segments exist + * forward metadata -> Publish mapping of the original segments to the EXCLUSIVE lock held for the same interval when present + */ +public class AppendTask extends AbstractTask +{ + private final Interval interval; + private final Granularity segmentGranularity; + private final String lockType; + private final int priority; + private final int numPartitions; + private final CountDownLatch readyLatch = new CountDownLatch(1); + private final CountDownLatch runLatch = new CountDownLatch(1); + private final CountDownLatch segmentAllocationComplete = new CountDownLatch(1); + private final CountDownLatch runComplete = new CountDownLatch(1); + private final CountDownLatch readyComplete = new CountDownLatch(1); + + private final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(); + + private TaskToolbox toolbox; + private final AtomicInteger sequenceId = new AtomicInteger(0); + + public AppendTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("segmentGranularity") Granularity segmentGranularity, + @JsonProperty("context") Map<String, Object> context + ) + { + super( + id == null ? StringUtils.format("replace_%s_%s", DateTimes.nowUtc(), UUID.randomUUID().toString()) : id, + dataSource == null ? "none" : dataSource, + context, + IngestionMode.APPEND + ); + this.interval = interval; + this.segmentGranularity = segmentGranularity; + this.lockType = getContextValue(Tasks.TASK_LOCK_TYPE, "EXCLUSIVE"); Review Comment: If this is an Append task, why would it use any lock other than the append lock? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A test Task which mimics an appending task by having similar interactions with the overlord. + * + * Begins running by acquiring an APPEND lock and immediately allocates pending segments. + * + * Task ends after publishing these pending segments and relevant metadata entries in a transaction + * Replace lock exists with version V3 + * V0 -> PS0 -> append task begins + * V1 -> (S1-0) -> replace task begins and published and completed + * V2 -> (S2-0) -> replace task begins and published and completed + * V3 -> Replace acquired lock + * + * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has completed + * + * + * (S3-0, 1) today + * (S3-0, 2), (S3-1, 2) needs to happen + * V3 replace task finishes -> (S3-0, S0-0 == S3-1) Review Comment: These comments seem... out of place? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A test Task which mimics an appending task by having similar interactions with the overlord. + * + * Begins running by acquiring an APPEND lock and immediately allocates pending segments. + * + * Task ends after publishing these pending segments and relevant metadata entries in a transaction + * Replace lock exists with version V3 + * V0 -> PS0 -> append task begins + * V1 -> (S1-0) -> replace task begins and published and completed + * V2 -> (S2-0) -> replace task begins and published and completed + * V3 -> Replace acquired lock + * + * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has completed + * + * + * (S3-0, 1) today + * (S3-0, 2), (S3-1, 2) needs to happen + * V3 replace task finishes -> (S3-0, S0-0 == S3-1) + * + * segment metadata -> Publish all pending segments and also create copies for greater versions for which used segments exist + * forward metadata -> Publish mapping of the original segments to the EXCLUSIVE lock held for the same interval when present Review Comment: Is this intended to be part of the javadoc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
