This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ff97c67945e Fix batch segment allocation failure with replicas (#17262)
ff97c67945e is described below

commit ff97c67945ed8bbf453b4c90ee9d4cac52be61cb
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Oct 7 19:52:38 2024 +0530

    Fix batch segment allocation failure with replicas (#17262)
    
    Fixes #16587
    
    Streaming ingestion tasks operate by allocating segments before ingesting 
rows.
    These allocations happen across replicas which may send different requests 
but
    must get the same segment id for a given (datasource, interval, version, 
sequenceName)
    across replicas.
    
    This patch fixes the bug by ignoring the previousSegmentId when 
skipLineageCheck is true.
---
 .../common/actions/SegmentAllocateActionTest.java  | 101 +++++++++++++++++++++
 .../IndexerSQLMetadataStorageCoordinator.java      |   3 +-
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index 02a3da0e1d0..fdb7fcd5595 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLock;
 import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -34,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@@ -61,11 +63,18 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -122,6 +131,63 @@ public class SegmentAllocateActionTest
     }
   }
 
+  @Test
+  public void testManySegmentsSameInterval_noLineageCheck() throws Exception
+  {
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    final Task task = NoopTask.create();
+    final int numTasks = 2;
+    final int numRequests = 200;
+
+    taskActionTestKit.getTaskLockbox().add(task);
+
+    ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d");
+
+    final List<Callable<SegmentIdWithShardSpec>> allocateTasks = new 
ArrayList<>();
+    for (int i = 0; i < numRequests; i++) {
+      final String sequence = "sequence_" + (i % numTasks);
+      allocateTasks.add(() -> allocateWithoutLineageCheck(
+          task,
+          PARTY_TIME,
+          Granularities.NONE,
+          Granularities.HOUR,
+          sequence,
+          TaskLockType.APPEND
+      ));
+    }
+
+    Set<SegmentIdWithShardSpec> allocatedIds = new HashSet<>();
+    for (Future<SegmentIdWithShardSpec> future : 
allocatorService.invokeAll(allocateTasks)) {
+      allocatedIds.add(future.get());
+    }
+
+    Thread.sleep(1_000);
+    for (Future<SegmentIdWithShardSpec> future : 
allocatorService.invokeAll(allocateTasks)) {
+      allocatedIds.add(future.get());
+    }
+
+
+    final TaskLock lock = Iterables.getOnlyElement(
+        
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+                      .filter(input -> 
input.getInterval().contains(PARTY_TIME))
+    );
+    Set<SegmentIdWithShardSpec> expectedIds = new HashSet<>();
+    for (int i = 0; i < numTasks; i++) {
+      expectedIds.add(
+          new SegmentIdWithShardSpec(
+              DATA_SOURCE,
+              Granularities.HOUR.bucket(PARTY_TIME),
+              lock.getVersion(),
+              new NumberedShardSpec(i, 0)
+          )
+      );
+    }
+    Assert.assertEquals(expectedIds, allocatedIds);
+  }
+
   @Test
   public void testManySegmentsSameInterval()
   {
@@ -1122,6 +1188,41 @@ public class SegmentAllocateActionTest
     );
   }
 
+  private SegmentIdWithShardSpec allocateWithoutLineageCheck(
+      final Task task,
+      final DateTime timestamp,
+      final Granularity queryGranularity,
+      final Granularity preferredSegmentGranularity,
+      final String sequenceName,
+      final TaskLockType taskLockType
+  )
+  {
+    final SegmentAllocateAction action = new SegmentAllocateAction(
+        DATA_SOURCE,
+        timestamp,
+        queryGranularity,
+        preferredSegmentGranularity,
+        sequenceName,
+        // prevSegmentId can vary across replicas and isn't deterministic
+        "random_" + ThreadLocalRandom.current().nextInt(),
+        true,
+        NumberedPartialShardSpec.instance(),
+        lockGranularity,
+        taskLockType
+    );
+
+    try {
+      if (useBatch) {
+        return action.performAsync(task, 
taskActionTestKit.getTaskActionToolbox()).get();
+      } else {
+        return action.perform(task, taskActionTestKit.getTaskActionToolbox());
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private SegmentIdWithShardSpec allocate(
       final Task task,
       final DateTime timestamp,
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 96cfd5dbf04..463232012ed 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1326,7 +1326,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     {
       this.interval = interval;
       this.sequenceName = request.getSequenceName();
-      this.previousSegmentId = request.getPreviousSegmentId();
+      // Even if the previousSegmentId is set, disregard it when skipping 
lineage check for streaming ingestion
+      this.previousSegmentId = skipSegmentLineageCheck ? null : 
request.getPreviousSegmentId();
       this.skipSegmentLineageCheck = skipSegmentLineageCheck;
 
       this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, 
skipSegmentLineageCheck);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to