imply-cheddar commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1231844539
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -79,7 +79,9 @@
@Type(name = "index_realtime", value = RealtimeIndexTask.class),
@Type(name = "index_realtime_appenderator", value =
AppenderatorDriverRealtimeIndexTask.class),
@Type(name = "noop", value = NoopTask.class),
- @Type(name = "compact", value = CompactionTask.class)
+ @Type(name = "compact", value = CompactionTask.class),
+ @Type(name = "replace", value = ReplaceTask.class),
+ @Type(name = "append", value = AppendTask.class)
Review Comment:
Do you need to add these? This is polluting the actual space of tasks that
are deployed and usable from an operating Druid system. If the tasks are for
tests, they shouldn't be registered in production.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TimeChunkLockRequest.java:
##########
@@ -115,6 +115,9 @@ public Interval getInterval()
@Override
public String getVersion()
{
+ if (lockType.equals(TaskLockType.APPEND) && preferredVersion == null) {
+ return "1970-01-01T00:00:00.000Z";
+ }
return preferredVersion == null ? DateTimes.nowUtc().toString() :
preferredVersion;
Review Comment:
The defaulting happening in this getter is a bit weird, let's try to make
the things that build the Request do the right thing and make this getter less
intelligent.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -911,9 +914,23 @@ private TaskStatus generateAndPublishSegments(
}
- final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish, commitMetadata) ->
- toolbox.getTaskActionClient()
-
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish));
+ final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish, commitMetadata) -> {
+ TaskLockType lockType =
TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE,
TaskLockType.EXCLUSIVE.name()));
+ switch (lockType) {
+ case REPLACE:
+ return toolbox.getTaskActionClient().submit(
+
SegmentTransactionalReplaceAction.overwriteAction(segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish)
Review Comment:
I'm not sure that the `overwriteAction` part of the method name is all that
useful (it actually just confused me a bit as I thought this was the same call
as the `default` one before I look at the class names). Just `create` should
be fine as the static method is basically just creating an action.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.DurationGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+/**
+ * A test Task which mimics an appending task by having similar interactions
with the overlord.
+ *
+ * Begins running by acquiring an APPEND lock and immediately allocates
pending segments.
+ *
+ * Task ends after publishing these pending segments and relevant metadata
entries in a transaction
+ * Replace lock exists with version V3
+ * V0 -> PS0 -> append task begins
+ * V1 -> (S1-0) -> replace task begins and published and completed
+ * V2 -> (S2-0) -> replace task begins and published and completed
+ * V3 -> Replace acquired lock
+ *
+ * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has
completed
+ *
+ *
+ * (S3-0, 1) today
+ * (S3-0, 2), (S3-1, 2) needs to happen
+ * V3 replace task finishes -> (S3-0, S0-0 == S3-1)
+ *
+ * segment metadata -> Publish all pending segments and also create copies for
greater versions for which used segments exist
+ * forward metadata -> Publish mapping of the original segments to the
EXCLUSIVE lock held for the same interval when present
+ */
+public class AppendTask extends AbstractTask
+{
+ private final Interval interval;
+ private final Granularity segmentGranularity;
+ private final String lockType;
+ private final int priority;
+ private final int numPartitions;
+ private final CountDownLatch readyLatch = new CountDownLatch(1);
+ private final CountDownLatch runLatch = new CountDownLatch(1);
+ private final CountDownLatch segmentAllocationComplete = new
CountDownLatch(1);
+ private final CountDownLatch runComplete = new CountDownLatch(1);
+ private final CountDownLatch readyComplete = new CountDownLatch(1);
+
+ private final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>();
+
+ private TaskToolbox toolbox;
+ private final AtomicInteger sequenceId = new AtomicInteger(0);
+
+ public AppendTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+ @JsonProperty("context") Map<String, Object> context
+ )
+ {
+ super(
+ id == null ? StringUtils.format("replace_%s_%s", DateTimes.nowUtc(),
UUID.randomUUID().toString()) : id,
+ dataSource == null ? "none" : dataSource,
+ context,
+ IngestionMode.APPEND
+ );
+ this.interval = interval;
+ this.segmentGranularity = segmentGranularity;
+ this.lockType = getContextValue(Tasks.TASK_LOCK_TYPE, "EXCLUSIVE");
+ this.priority = getContextValue(Tasks.PRIORITY_KEY, 0);
+ this.numPartitions = getContextValue("numPartitions", 0);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "replace";
+ }
+
+ @Nonnull
+ @JsonIgnore
+ @Override
+ public Set<ResourceAction> getInputSourceResources()
+ {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ readyLatch.await();
+ return tryTimeChunkLockSingleInterval(
+ new SurrogateTaskActionClient(getId(), taskActionClient),
+ interval,
+ TaskLockType.valueOf(lockType)
+ );
+ }
+
+ private boolean tryTimeChunkLockSingleInterval(TaskActionClient client,
Interval interval, TaskLockType lockType)
+ throws IOException
+ {
+ final TaskLock lock = client.submit(new
TimeChunkLockTryAcquireAction(lockType, interval));
+ if (lock == null) {
+ return false;
+ }
+ if (lock.isRevoked()) {
+ throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.",
interval));
+ }
+ return true;
+ }
+
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+ {
+ this.toolbox = toolbox;
+ readyComplete.countDown();
+
+ //final Set<SegmentIdWithShardSpec> pendingSegments =
allocatePendingSegments(toolbox);
+
+ segmentAllocationComplete.await();
+
+ runLatch.await();
+
+ if (publishSegments(toolbox, convertPendingSegments(pendingSegments))) {
+ return TaskStatus.success(getId());
+ }
+ return TaskStatus.failure(getId(), "Failed to append segments");
+ }
+
+ @Override
+ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws
Exception
+ {
+ super.cleanUp(toolbox, taskStatus);
+ runComplete.countDown();
+ }
+
+ public SegmentIdWithShardSpec allocateOrGetSegmentForTimestamp(String
timestamp)
+ {
+ final DateTime time = DateTime.parse(timestamp);
+ for (SegmentIdWithShardSpec pendingSegment : pendingSegments) {
+ if (pendingSegment.getInterval().contains(time)) {
+ return pendingSegment;
+ }
+ }
+ return allocateNewSegmentForDate(time);
+ }
+
+ public SegmentIdWithShardSpec allocateNewSegmentForTimestamp(String
timestamp)
+ {
+ return allocateNewSegmentForDate(DateTime.parse(timestamp));
+ }
Review Comment:
These are the methods that you are using to tell the task what to do from
the test thread. But they are also doing all of their work on the actual test
thread, not on the task's thread. This means that if you ever have one of
these block, it's going to block your test thread instead of the task thread,
meaning that your test cannot make progress.
In order to fix this, you will need to make the actual `run()` part of the
task basically just sit and wait on a queue of work for it to do. Then these
calls would add new Runnables to that queue. Once you do that, you will likely
find that it's incredibly simple to control the behavior of the tasks from the
test itself.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -259,6 +263,16 @@ SegmentIdWithShardSpec allocatePendingSegment(
* @throws IllegalArgumentException if startMetadata and endMetadata are not
either both null or both non-null
* @throws RuntimeException if the state of metadata storage after
this call is unknown
*/
+ SegmentPublishResult announceHistoricalSegments(
+ Set<DataSegment> segments,
+ Set<DataSegment> segmentsToDrop,
+ @Nullable DataSourceMetadata startMetadata,
+ @Nullable DataSourceMetadata endMetadata,
+ @Nullable Map<DataSegment, TaskLockInfo> segmentLockMap,
+ @Nullable Set<TaskLockInfo> taskLockInfos,
+ boolean append
+ ) throws IOException;
+
Review Comment:
This is running the risk of changing the logic of how the other locks
operate. I'd rather not take that risk. Instead of trying to reuse this
method. Please create 2 new methods:
```
commitReplaceSegments()
commitAppendSegments()
```
And have the new locks use those methods instead. If you do that, then the
code for the DB interactions should all be new method implementations without
any interleaved changes with old logic, which is an easier way to validate that
old logic goes unchanged.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java:
##########
@@ -72,4 +72,9 @@ default boolean supportsEmptyPublish()
{
return false;
}
+
+
+ // append starts (2023-01-01/2023-02-01) - V0-0 pending segment
+ // replace starts and ends (2023-01-01/2024-01-01) - V1-0 committed segment
+ // append ends (2023-01-01/2023-02-01) - V0-0 committed segment,
(2023-01-01/2024-01-01) V1-1 committed segment
Review Comment:
Random extra comments here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/ReplaceTask.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+
+/**
+ * A test Task which mimics a replacing task by having similar interactions
with the overlord.
+ *
+ * Begins running by acquiring a REPLACE lock
+ *
+ * Task ends after publishing a set of core partitions and
+ * creating metadata copies for all appended segments published when this lock
was held
+ */
+public class ReplaceTask extends AbstractTask
+{
+ private final Interval interval;
+ private final Granularity segmentGranularity;
+ private final String lockType;
+ private final int priority;
+ private final int numCorePartitions;
+ private final CountDownLatch readyLatch = new CountDownLatch(1);
+ private final CountDownLatch readyComplete = new CountDownLatch(1);
+ private final CountDownLatch runLatch = new CountDownLatch(1);
+ private final CountDownLatch runComplete = new CountDownLatch(1);
+ private String version;
+
+ @JsonCreator
+ public ReplaceTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+ @JsonProperty("context") Map<String, Object> context
Review Comment:
Given that this is intended to be used in tests, do you need all the jackson
stuff?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]