abhishekagarwal87 commented on code in PR #14407: URL: https://github.com/apache/druid/pull/14407#discussion_r1349492533
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. Review Comment: this javadoc is not very clear. versions of "what" segments? The ones being replaced? Also what does it mean here by "your" task. Some verbosity here could be helpful. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java: ########## @@ -110,6 +127,63 @@ public static boolean isLockCoversSegments( ); } + /** + * Determines the type of time chunk lock to use for appending segments. + * <p> + * This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType} + * by passing the ParallelIndexSupervisorTask instance into the + * SinglePhaseParallelIndexTaskRunner. + */ + public static TaskLockType determineLockTypeForAppend( + Map<String, Object> taskContext + ) + { + final Object lockType = taskContext.get(Tasks.TASK_LOCK_TYPE); + if (lockType == null) { + final boolean useSharedLock = (boolean) taskContext.getOrDefault(Tasks.USE_SHARED_LOCK, false); + return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE; + } else { + return TaskLockType.valueOf(lockType.toString()); + } + } + + /** + * Finds locks of type {@link TaskLockType#REPLACE} for each of the given segments + * that have an interval completely covering the interval of the respective segments. + * + * @return Map from segment to REPLACE lock that completely covers it. The map + * does not contain an entry for segments that have no covering REPLACE lock. + */ + public static Map<DataSegment, ReplaceTaskLock> findReplaceLocksCoveringSegments( + final String datasource, + final TaskLockbox taskLockbox, + final Set<DataSegment> segments + ) + { + // Identify unique segment intervals + final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>(); + segments.forEach( + segment -> intervalToSegments.computeIfAbsent( + segment.getInterval(), interval -> new ArrayList<>() + ).add(segment) + ); + + final Set<ReplaceTaskLock> replaceLocks = taskLockbox.getAllReplaceLocksForDatasource(datasource); Review Comment: Is this a metadata db call? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult> +{ + /** + * Set of segments to be inserted into metadata storage + */ + private final Set<DataSegment> segments; + + public static SegmentTransactionalReplaceAction create( + Set<DataSegment> segmentsToPublish + ) + { + return new SegmentTransactionalReplaceAction(segmentsToPublish); + } + + @JsonCreator + private SegmentTransactionalReplaceAction( + @JsonProperty("segments") Set<DataSegment> segments + ) + { + this.segments = ImmutableSet.copyOf(segments); + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); + + // Find the active replace locks held only by this task + final Set<ReplaceTaskLock> replaceLocksForTask + = toolbox.getTaskLockbox().findReplaceLocksForTask(task); + + final SegmentPublishResult retVal; + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator() + .commitReplaceSegments(segments, replaceLocksForTask) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + + if (retVal.isSuccess()) { + toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); + + for (DataSegment segment : retVal.getSegments()) { + final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType(); + metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType); + metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); Review Comment: why not call the same setSegmentDimensions method here too? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java: ########## @@ -443,6 +466,42 @@ protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> inter return true; } + private TaskLockHelper createLockHelper(LockGranularity lockGranularity) + { + return new TaskLockHelper( + lockGranularity == LockGranularity.SEGMENT, + determineLockType(lockGranularity) + ); + } + + /** + * Determines the type of lock to use with the given lock granularity. + */ + private TaskLockType determineLockType(LockGranularity lockGranularity) + { + if (lockGranularity == LockGranularity.SEGMENT) { + return TaskLockType.EXCLUSIVE; + } + + final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE); + final TaskLockType lockType; + if (contextLockType == null) { + lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) + ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE; + } else { + lockType = TaskLockType.valueOf(contextLockType); + } + + final IngestionMode ingestionMode = getIngestionMode(); + if ((lockType == TaskLockType.SHARED || lockType == TaskLockType.APPEND) + && ingestionMode != IngestionMode.APPEND) { + // Lock types SHARED and APPEND are allowed only in APPEND ingestion mode + return Tasks.DEFAULT_TASK_LOCK_TYPE; + } else { + return lockType; Review Comment: should we instead throw an exception when task lock type is Append but ingestion mode is not append? That seems like something that would only happen because of a mistake. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -218,7 +221,22 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // abandoned. toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - toolbox.getDataSegmentKiller().kill(unusedSegments); + + // Fetch the load specs of all segments overlapping with the given interval + final Set<Map<String, Object>> usedSegmentLoadSpecs = toolbox + .getTaskActionClient() + .submit(new RetrieveUsedSegmentsAction(getDataSource(), getInterval(), null, Segments.INCLUDING_OVERSHADOWED)) + .stream() + .map(DataSegment::getLoadSpec) + .collect(Collectors.toSet()); + + // Kill segments from the deep storage only if their load specs are not being used by any used segments + final List<DataSegment> segmentsToBeKilled = unusedSegments + .stream() + .filter(unusedSegment -> !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec())) + .collect(Collectors.toList()); + + toolbox.getDataSegmentKiller().kill(segmentsToBeKilled); Review Comment: This action is very expensive action especially if the interval is eternity which is often the case with killUnusedSegments task. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java: ########## @@ -110,6 +127,63 @@ public static boolean isLockCoversSegments( ); } + /** + * Determines the type of time chunk lock to use for appending segments. + * <p> + * This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType} + * by passing the ParallelIndexSupervisorTask instance into the + * SinglePhaseParallelIndexTaskRunner. Review Comment: This passage is not clear. What kind of de-duplication does it refer to? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java: ########## @@ -96,7 +112,8 @@ public static boolean isLockCoversSegments( final TimeChunkLock timeChunkLock = (TimeChunkLock) lock; return timeChunkLock.getInterval().contains(segment.getInterval()) && timeChunkLock.getDataSource().equals(segment.getDataSource()) - && timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0; + && (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0 + || TaskLockType.APPEND.equals(timeChunkLock.getType())); Review Comment: Please leave some comments here, like append by definition covers all versions unlike other lock types ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java: ########## @@ -889,6 +891,71 @@ public List<TaskLock> findLocksForTask(final Task task) } } + /** + * Finds the active non-revoked REPLACE locks held by the given task. + */ + public Set<ReplaceTaskLock> findReplaceLocksForTask(Task task) + { + giant.lock(); + try { + return getNonRevokedReplaceLocks(findLockPossesForTask(task), task.getDataSource()); + } + finally { + giant.unlock(); + } + } + + /** + * Finds all the active non-revoked REPLACE locks for the given datasource. + */ + public Set<ReplaceTaskLock> getAllReplaceLocksForDatasource(String datasource) + { + giant.lock(); + try { + final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> activeLocks = running.get(datasource); + if (activeLocks == null) { + return ImmutableSet.of(); + } + + List<TaskLockPosse> lockPosses + = activeLocks.values() + .stream() + .flatMap(map -> map.values().stream()) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + return getNonRevokedReplaceLocks(lockPosses, datasource); + } + finally { + giant.unlock(); + } + } + + private Set<ReplaceTaskLock> getNonRevokedReplaceLocks(List<TaskLockPosse> posses, String datasource) + { + final Set<ReplaceTaskLock> replaceLocks = new HashSet<>(); + for (TaskLockPosse posse : posses) { + final TaskLock lock = posse.getTaskLock(); + if (lock.isRevoked() || !TaskLockType.REPLACE.equals(posse.getTaskLock().getType())) { + continue; + } + + // Replace locks are always held by the supervisor task + if (posse.taskIds.size() > 1) { + throw new ISE( + "Replace lock[%s] for datasource[%s] is held by multiple tasks[%s]", + lock, datasource, posse.taskIds + ); Review Comment: DruidException.defensive should have been better fit. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockInfo; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by + * your task for the segment intervals. + */ +public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> +{ + private final Set<DataSegment> segments; + + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; + @Nullable + private final String dataSource; + + public static SegmentTransactionalAppendAction appendAction( + Set<DataSegment> segments, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, null); + } + + @JsonCreator + private SegmentTransactionalAppendAction( + @JsonProperty("segments") @Nullable Set<DataSegment> segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("dataSource") @Nullable String dataSource + ) + { + this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + this.dataSource = dataSource; + } + + @JsonProperty + public Set<DataSegment> getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @JsonProperty + @Nullable + public String getDataSource() + { + return dataSource; + } + + @Override + public TypeReference<SegmentPublishResult> getReturnTypeReference() + { + return new TypeReference<SegmentPublishResult>() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult retVal; + + if (segments.isEmpty()) { + // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), + // but still needs to update metadata with the progress that the task made. + try { + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + dataSource, + startMetadata, + endMetadata + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return retVal; + } + + final Set<DataSegment> allSegments = new HashSet<>(segments); + + String datasource = task.getDataSource(); + Map<Interval, TaskLock> replaceLocks = new HashMap<>(); + for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, toolbox.getTaskLockbox(), segments)) { + replaceLocks.put(lock.getInterval(), lock); + } + Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>(); + Set<TaskLockInfo> taskLockInfos = new HashSet<>(); + for (TaskLock taskLock : replaceLocks.values()) { + taskLockInfos.add(getTaskLockInfo(taskLock)); + } + + for (DataSegment segment : segments) { + Interval interval = segment.getInterval(); + for (Interval key : replaceLocks.keySet()) { + if (key.contains(interval)) { + appendSegmentLockMap.put(segment, getTaskLockInfo(replaceLocks.get(key))); + } + } + } + + try { + retVal = toolbox.getTaskLockbox().doInCriticalSection( + task, + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + CriticalAction.<SegmentPublishResult>builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + segments, + null, + startMetadata, + endMetadata, + appendSegmentLockMap, + taskLockInfos, + true + ) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) Review Comment: We should have logged the intervals though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
