imply-cheddar commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1231844539


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -79,7 +79,9 @@
     @Type(name = "index_realtime", value = RealtimeIndexTask.class),
     @Type(name = "index_realtime_appenderator", value = 
AppenderatorDriverRealtimeIndexTask.class),
     @Type(name = "noop", value = NoopTask.class),
-    @Type(name = "compact", value = CompactionTask.class)
+    @Type(name = "compact", value = CompactionTask.class),
+    @Type(name = "replace", value = ReplaceTask.class),
+    @Type(name = "append", value = AppendTask.class)

Review Comment:
   Do you need to add these?  This is polluting the actual space of tasks that 
are deployed and usable from an operating Druid system.  If the tasks are for 
tests, they shouldn't be registered in production.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TimeChunkLockRequest.java:
##########
@@ -115,6 +115,9 @@ public Interval getInterval()
   @Override
   public String getVersion()
   {
+    if (lockType.equals(TaskLockType.APPEND) && preferredVersion == null) {
+      return "1970-01-01T00:00:00.000Z";
+    }
     return preferredVersion == null ? DateTimes.nowUtc().toString() : 
preferredVersion;

Review Comment:
   The defaulting happening in this getter is a bit weird, let's try to make 
the things that build the Request do the right thing and make this getter less 
intelligent.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -911,9 +914,23 @@ private TaskStatus generateAndPublishSegments(
     }
 
 
-    final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish, commitMetadata) ->
-        toolbox.getTaskActionClient()
-               
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
 segmentsToDrop, segmentsToPublish));
+    final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish, commitMetadata) -> {
+      TaskLockType lockType = 
TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, 
TaskLockType.EXCLUSIVE.name()));
+      switch (lockType) {
+        case REPLACE:
+          return toolbox.getTaskActionClient().submit(
+              
SegmentTransactionalReplaceAction.overwriteAction(segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish)

Review Comment:
   I'm not sure that the `overwriteAction` part of the method name is all that 
useful (it actually just confused me a bit as I thought this was the same call 
as the `default` one before I look at the class names).  Just `create` should 
be fine as the static method is basically just creating an action.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.DurationGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+/**
+ * A test Task which mimics an appending task by having similar interactions 
with the overlord.
+ *
+ * Begins running by acquiring an APPEND lock and immediately allocates 
pending segments.
+ *
+ * Task ends after publishing these pending segments and relevant metadata 
entries in a transaction
+ * Replace lock exists with version V3
+ * V0 -> PS0 -> append task begins
+ * V1 -> (S1-0) -> replace task begins and published and completed
+ * V2 -> (S2-0) -> replace task begins and published and completed
+ * V3 -> Replace acquired lock
+ *
+ * append task publishes PS0 -> S0-0, S1-1, S2-1 ; (S0-0, V3); Append task has 
completed
+ *
+ *
+ * (S3-0, 1) today
+ * (S3-0, 2), (S3-1, 2) needs to happen
+ * V3 replace task finishes -> (S3-0, S0-0 == S3-1)
+ *
+ * segment metadata -> Publish all pending segments and also create copies for 
greater versions for which used segments exist
+ * forward metadata -> Publish mapping of the original segments to the 
EXCLUSIVE lock held for the same interval when present
+ */
+public class AppendTask extends AbstractTask
+{
+  private final Interval interval;
+  private final Granularity segmentGranularity;
+  private final String lockType;
+  private final int priority;
+  private final int numPartitions;
+  private final CountDownLatch readyLatch = new CountDownLatch(1);
+  private final CountDownLatch runLatch = new CountDownLatch(1);
+  private final CountDownLatch segmentAllocationComplete = new 
CountDownLatch(1);
+  private final CountDownLatch runComplete = new CountDownLatch(1);
+  private final CountDownLatch readyComplete = new CountDownLatch(1);
+
+  private final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>();
+
+  private TaskToolbox toolbox;
+  private final AtomicInteger sequenceId = new AtomicInteger(0);
+
+  public AppendTask(
+      @JsonProperty("id") String id,
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+      @JsonProperty("context") Map<String, Object> context
+  )
+  {
+    super(
+        id == null ? StringUtils.format("replace_%s_%s", DateTimes.nowUtc(), 
UUID.randomUUID().toString()) : id,
+        dataSource == null ? "none" : dataSource,
+        context,
+        IngestionMode.APPEND
+    );
+    this.interval = interval;
+    this.segmentGranularity = segmentGranularity;
+    this.lockType = getContextValue(Tasks.TASK_LOCK_TYPE, "EXCLUSIVE");
+    this.priority = getContextValue(Tasks.PRIORITY_KEY, 0);
+    this.numPartitions = getContextValue("numPartitions", 0);
+  }
+
+  @Override
+  public String getType()
+  {
+    return "replace";
+  }
+
+  @Nonnull
+  @JsonIgnore
+  @Override
+  public Set<ResourceAction> getInputSourceResources()
+  {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    readyLatch.await();
+    return tryTimeChunkLockSingleInterval(
+        new SurrogateTaskActionClient(getId(), taskActionClient),
+        interval,
+        TaskLockType.valueOf(lockType)
+    );
+  }
+
+  private boolean tryTimeChunkLockSingleInterval(TaskActionClient client, 
Interval interval, TaskLockType lockType)
+      throws IOException
+  {
+    final TaskLock lock = client.submit(new 
TimeChunkLockTryAcquireAction(lockType, interval));
+    if (lock == null) {
+      return false;
+    }
+    if (lock.isRevoked()) {
+      throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", 
interval));
+    }
+    return true;
+  }
+
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    this.toolbox = toolbox;
+    readyComplete.countDown();
+
+    //final Set<SegmentIdWithShardSpec> pendingSegments = 
allocatePendingSegments(toolbox);
+
+    segmentAllocationComplete.await();
+
+    runLatch.await();
+
+    if (publishSegments(toolbox, convertPendingSegments(pendingSegments))) {
+      return TaskStatus.success(getId());
+    }
+    return TaskStatus.failure(getId(), "Failed to append segments");
+  }
+
+  @Override
+  public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws 
Exception
+  {
+    super.cleanUp(toolbox, taskStatus);
+    runComplete.countDown();
+  }
+
+  public SegmentIdWithShardSpec allocateOrGetSegmentForTimestamp(String 
timestamp)
+  {
+    final DateTime time = DateTime.parse(timestamp);
+    for (SegmentIdWithShardSpec pendingSegment : pendingSegments) {
+      if (pendingSegment.getInterval().contains(time)) {
+        return pendingSegment;
+      }
+    }
+    return allocateNewSegmentForDate(time);
+  }
+
+  public SegmentIdWithShardSpec allocateNewSegmentForTimestamp(String 
timestamp)
+  {
+    return allocateNewSegmentForDate(DateTime.parse(timestamp));
+  }

Review Comment:
   These are the methods that you are using to tell the task what to do from 
the test thread.  But they are also doing all of their work on the actual test 
thread, not on the task's thread.  This means that if you ever have one of 
these block, it's going to block your test thread instead of the task thread, 
meaning that your test cannot make progress.
   
   In order to fix this, you will need to make the actual `run()` part of the 
task basically just sit and wait on a queue of work for it to do.  Then these 
calls would add new Runnables to that queue.  Once you do that, you will likely 
find that it's incredibly simple to control the behavior of the tasks from the 
test itself.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -259,6 +263,16 @@ SegmentIdWithShardSpec allocatePendingSegment(
    * @throws IllegalArgumentException if startMetadata and endMetadata are not 
either both null or both non-null
    * @throws RuntimeException         if the state of metadata storage after 
this call is unknown
    */
+  SegmentPublishResult announceHistoricalSegments(
+      Set<DataSegment> segments,
+      Set<DataSegment> segmentsToDrop,
+      @Nullable DataSourceMetadata startMetadata,
+      @Nullable DataSourceMetadata endMetadata,
+      @Nullable Map<DataSegment, TaskLockInfo> segmentLockMap,
+      @Nullable Set<TaskLockInfo> taskLockInfos,
+      boolean append
+  ) throws IOException;
+

Review Comment:
   This is running the risk of changing the logic of how the other locks 
operate.  I'd rather not take that risk.  Instead of trying to reuse this 
method.  Please create 2 new methods:
   
   ```
   commitReplaceSegments()
   commitAppendSegments()
   ```
   
   And have the new locks use those methods instead.  If you do that, then the 
code for the DB interactions should all be new method implementations without 
any interleaved changes with old logic, which is an easier way to validate that 
old logic goes unchanged.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java:
##########
@@ -72,4 +72,9 @@ default boolean supportsEmptyPublish()
   {
     return false;
   }
+
+
+  // append starts (2023-01-01/2023-02-01) - V0-0 pending segment
+  // replace starts and ends (2023-01-01/2024-01-01) - V1-0 committed segment
+  // append ends (2023-01-01/2023-02-01) - V0-0 committed segment, 
(2023-01-01/2024-01-01) V1-1 committed segment

Review Comment:
   Random extra comments here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/ReplaceTask.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+
+/**
+ * A test Task which mimics a replacing task by having similar interactions 
with the overlord.
+ *
+ * Begins running by acquiring a REPLACE lock
+ *
+ * Task ends after publishing a set of core partitions and
+ * creating metadata copies for all appended segments published when this lock 
was held
+ */
+public class ReplaceTask extends AbstractTask
+{
+  private final Interval interval;
+  private final Granularity segmentGranularity;
+  private final String lockType;
+  private final int priority;
+  private final int numCorePartitions;
+  private final CountDownLatch readyLatch = new CountDownLatch(1);
+  private final CountDownLatch readyComplete = new CountDownLatch(1);
+  private final CountDownLatch runLatch = new CountDownLatch(1);
+  private final CountDownLatch runComplete = new CountDownLatch(1);
+  private String version;
+
+  @JsonCreator
+  public ReplaceTask(
+      @JsonProperty("id") String id,
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+      @JsonProperty("context") Map<String, Object> context

Review Comment:
   Given that this is intended to be used in tests, do you need all the jackson 
stuff?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to