kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1552762850


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1184,6 +1201,23 @@ public void remove(final Task task)
               task.getId()
           );
         }
+        final String pendingSegmentGroup = task.getPendingSegmentGroup();

Review Comment:
   I would advise keeping the cleanup logic in a separate PR. We will be able 
to focus and test on it better.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -269,6 +269,12 @@ public boolean withinMinMaxRecordTime(final InputRow row)
     return !beforeMinimumMessageTime && !afterMaximumMessageTime;
   }
 
+  @Override
+  public String getPendingSegmentGroup()
+  {
+    return getTaskResource().getAvailabilityGroup();

Review Comment:
   Maybe add a comment here that this would turn out to be the same as the 
`baseSequenceName`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments

Review Comment:
   This would fetch all pending segments of this datasource, and we would try 
to build a list of all pending segments of this datasource that were ever 
upgraded. Since the upgrade is now happening in the same transaction as the 
commit, `SegmentPublishResult` should include the map of upgraded pending 
segments.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -725,6 +725,7 @@ public void testMultipleGranularities()
     // Append segment for Oct-Dec
     final DataSegment segmentV02 = asSegment(pendingSegment02);
     appendTask2.commitAppendSegments(segmentV02);
+    appendTask2.finishRunAndGetStatus();

Review Comment:
   Why is this change needed?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
   // this set should be accessed under the giant lock.
   private final Set<String> activeTasks = new HashSet<>();
 
+  // Stores map of pending task group of tasks to the set of their ids.
+  // Useful for task replicas. Clean up pending segments only when the set is 
empty.
+  // this map should be accessed under the giant lock.

Review Comment:
   Rather than this comment, we can just add a `@GuardedBy("giant")` annotation.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -135,14 +135,16 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
     if (startMetadata == null) {
       publishAction = () -> 
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
           segments,

Review Comment:
   At some point, the plan was to have one action for just committing segments 
and another action for committing segments and metadata both. So we decided to 
keep a method that would just commit segments.
   
   But we eventually decided against having the two actions as it didn't really 
serve a lot of purpose. So now we could simplify the 
`IndexerMetadataStorageCoordinator` interface too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments

Review Comment:
   The method `tryUpgradePendingSegmentsOverlappingWith` needs to renamed as it 
is not doing the upgrade anymore.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java:
##########
@@ -279,6 +279,13 @@ public String getGroupId()
     return groupId;
   }
 
+  @Nullable
+  @Override
+  public String getPendingSegmentGroup()

Review Comment:
   Rather than returning `null`, this method should throw unsupported operation 
exception. Only task types that actually need to allocate pending segments 
should return any value at all. This point should also be called out in the 
javadoc of the interface method.
   
   I would even advise creating a new interface, `PendingSegmentAllocatingTask` 
and implementing it only where needed. `SegmentTransactionalAppendAction` could 
check if the task at hand is an `instanceof PendingSegmentAllocatingTask` and 
then proceed.
   
   `PendingSegmentAllocatingTask` can extend `Task` interface if you want, for 
simplicity.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java:
##########
@@ -133,13 +133,19 @@ private <V> void addToQueue(Command<V> command)
   private <V> V waitForCommandToFinish(Command<V> command)
   {
     try {
-      return command.value.get(10, TimeUnit.SECONDS);
+      return command.value.get(1000, TimeUnit.SECONDS);

Review Comment:
   This is a very long timeout for tests. Is this really needed?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java:
##########
@@ -49,6 +53,9 @@ public class ActionsTestTask extends CommandQueueTask
 {
   private final TaskActionClient client;
   private final AtomicInteger sequenceId = new AtomicInteger(0);
+  private final Object lock = new Object();
+  @GuardedBy("lock")
+  private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = 
new HashMap<>();

Review Comment:
   Why do we need locks here? The only thread writing/reading from this field 
seems to be the test thread itself.



##########
server/src/main/java/org/apache/druid/metadata/PendingSegment.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+public class PendingSegment
+{
+  private final SegmentIdWithShardSpec id;
+  private final String sequenceName;
+  private final String sequencePrevId;
+  private final String parentId;
+  private final String taskGroup;
+
+  @JsonCreator

Review Comment:
   Is this class ever deserialized?



##########
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java:
##########
@@ -574,7 +574,7 @@ private void 
insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> se
     );
   }
 
-  @Test
+  //@Test

Review Comment:
   Why have we disabled this?



##########
server/src/main/java/org/apache/druid/metadata/PendingSegment.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+public class PendingSegment

Review Comment:
   Let's call it `PendingSegmentRecord` as it is meant to mirror an entry in 
the pending segments table.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -475,4 +476,8 @@ SegmentPublishResult commitMetadataOnly(
    * @return number of deleted entries from the metadata store
    */
   int deleteUpgradeSegmentsForTask(String taskId);
+
+  int deletePendingSegmentsForTaskGroup(String taskGroup);

Review Comment:
   Since we are leaving out the cleanup, we don't need this method for now.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -101,6 +101,12 @@ public interface Task
    */
   String getGroupId();
 
+  /**
+   * The group used to associate a pending segment with a task
+   * @return task group for pending segment allocations
+   */
+  String getPendingSegmentGroup();

Review Comment:
   If we end up declaring this method on the `Task` interface, it should have a 
default implementation that throws `UnsupportedOpEx`.



##########
server/src/main/java/org/apache/druid/metadata/PendingSegment.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+public class PendingSegment
+{
+  private final SegmentIdWithShardSpec id;
+  private final String sequenceName;
+  private final String sequencePrevId;
+  private final String parentId;
+  private final String taskGroup;
+
+  @JsonCreator
+  public PendingSegment(
+      @JsonProperty("id") SegmentIdWithShardSpec id,
+      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("sequencePrevId") String sequencePrevId,
+      @JsonProperty("parentId") @Nullable String parentId,
+      @JsonProperty("taskGroup") @Nullable String taskGroup
+  )
+  {
+    this.id = id;
+    this.sequenceName = sequenceName;
+    this.sequencePrevId = sequencePrevId;
+    this.parentId = parentId;
+    this.taskGroup = taskGroup;
+  }
+
+  @JsonProperty
+  public SegmentIdWithShardSpec getId()
+  {
+    return id;
+  }
+
+  @JsonProperty

Review Comment:
   Is this class ever serialized?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java:
##########
@@ -222,11 +224,16 @@ public TaskActionToolbox createTaskActionToolbox()
         taskStorage,
         storageCoordinator,
         new NoopServiceEmitter(),
-        null,
+        supervisorManager,
         objectMapper
     );
   }
 
+  protected void setSupervisorManager(SupervisorManager supervisorManager)

Review Comment:
   Rather than this, have a separate `createTaskActionToolbox` method that 
accepts a `SupervisorManager`.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java:
##########
@@ -101,28 +121,10 @@ public SegmentIdWithShardSpec 
allocateSegmentForTimestamp(DateTime timestamp, Gr
             TaskLockType.APPEND
         )
     );
-  }
-
-  public SegmentIdWithShardSpec allocateSegmentForTimestamp(

Review Comment:
   Was this method not being used anywhere?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java:
##########
@@ -156,6 +156,13 @@ public Set<ResourceAction> getInputSourceResources()
     return ImmutableSet.of();
   }
 
+  @JsonIgnore

Review Comment:
   Yeah, most likely not required. See `getPriority()` and `isReady()`. They 
have not been annotated with either `@JsonProperty` or `@JsonIgnore` but don't 
get serialized out.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -101,6 +101,12 @@ public interface Task
    */
   String getGroupId();
 
+  /**
+   * The group used to associate a pending segment with a task
+   * @return task group for pending segment allocations

Review Comment:
   The definition of this field should say something like:
   
   ```
   Unique string used by a task (or its sub-tasks and replicas) to allocate 
pending segments and identify pending segments allocated to it.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -213,6 +218,13 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
           activeTasks.remove(task.getId());
         }
       }
+      activePendingTaskGroupToTaskIds.clear();
+      for (Task task : storedActiveTasks) {
+        if (activeTasks.contains(task.getId()) && 
task.getPendingSegmentGroup() != null) {

Review Comment:
   Once the Overlord is upgraded to a version that contains these changes, the 
method `task.getPendingSegmentGroup()` would never return null irrespective of 
when the task was created, because the `pendingSegmentGroupId` is not a 
serialized field of the task.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -324,7 +326,8 @@ SegmentPublishResult commitSegmentsAndMetadata(
    */
   SegmentPublishResult commitAppendSegments(
       Set<DataSegment> appendSegments,
-      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      String taskGroup

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to