This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ff97c67945e Fix batch segment allocation failure with replicas (#17262)
ff97c67945e is described below
commit ff97c67945ed8bbf453b4c90ee9d4cac52be61cb
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Oct 7 19:52:38 2024 +0530
Fix batch segment allocation failure with replicas (#17262)
Fixes #16587
Streaming ingestion tasks operate by allocating segments before ingesting
rows.
These allocations happen across replicas which may send different requests
but
must get the same segment id for a given (datasource, interval, version,
sequenceName)
across replicas.
This patch fixes the bug by ignoring the previousSegmentId when
skipLineageCheck is true.
---
.../common/actions/SegmentAllocateActionTest.java | 101 +++++++++++++++++++++
.../IndexerSQLMetadataStorageCoordinator.java | 3 +-
2 files changed, 103 insertions(+), 1 deletion(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index 02a3da0e1d0..fdb7fcd5595 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -34,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@@ -61,11 +63,18 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -122,6 +131,63 @@ public class SegmentAllocateActionTest
}
}
+ @Test
+ public void testManySegmentsSameInterval_noLineageCheck() throws Exception
+ {
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ final Task task = NoopTask.create();
+ final int numTasks = 2;
+ final int numRequests = 200;
+
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d");
+
+ final List<Callable<SegmentIdWithShardSpec>> allocateTasks = new
ArrayList<>();
+ for (int i = 0; i < numRequests; i++) {
+ final String sequence = "sequence_" + (i % numTasks);
+ allocateTasks.add(() -> allocateWithoutLineageCheck(
+ task,
+ PARTY_TIME,
+ Granularities.NONE,
+ Granularities.HOUR,
+ sequence,
+ TaskLockType.APPEND
+ ));
+ }
+
+ Set<SegmentIdWithShardSpec> allocatedIds = new HashSet<>();
+ for (Future<SegmentIdWithShardSpec> future :
allocatorService.invokeAll(allocateTasks)) {
+ allocatedIds.add(future.get());
+ }
+
+ Thread.sleep(1_000);
+ for (Future<SegmentIdWithShardSpec> future :
allocatorService.invokeAll(allocateTasks)) {
+ allocatedIds.add(future.get());
+ }
+
+
+ final TaskLock lock = Iterables.getOnlyElement(
+
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
+ .filter(input ->
input.getInterval().contains(PARTY_TIME))
+ );
+ Set<SegmentIdWithShardSpec> expectedIds = new HashSet<>();
+ for (int i = 0; i < numTasks; i++) {
+ expectedIds.add(
+ new SegmentIdWithShardSpec(
+ DATA_SOURCE,
+ Granularities.HOUR.bucket(PARTY_TIME),
+ lock.getVersion(),
+ new NumberedShardSpec(i, 0)
+ )
+ );
+ }
+ Assert.assertEquals(expectedIds, allocatedIds);
+ }
+
@Test
public void testManySegmentsSameInterval()
{
@@ -1122,6 +1188,41 @@ public class SegmentAllocateActionTest
);
}
+ private SegmentIdWithShardSpec allocateWithoutLineageCheck(
+ final Task task,
+ final DateTime timestamp,
+ final Granularity queryGranularity,
+ final Granularity preferredSegmentGranularity,
+ final String sequenceName,
+ final TaskLockType taskLockType
+ )
+ {
+ final SegmentAllocateAction action = new SegmentAllocateAction(
+ DATA_SOURCE,
+ timestamp,
+ queryGranularity,
+ preferredSegmentGranularity,
+ sequenceName,
+ // prevSegmentId can vary across replicas and isn't deterministic
+ "random_" + ThreadLocalRandom.current().nextInt(),
+ true,
+ NumberedPartialShardSpec.instance(),
+ lockGranularity,
+ taskLockType
+ );
+
+ try {
+ if (useBatch) {
+ return action.performAsync(task,
taskActionTestKit.getTaskActionToolbox()).get();
+ } else {
+ return action.perform(task, taskActionTestKit.getTaskActionToolbox());
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 96cfd5dbf04..463232012ed 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1326,7 +1326,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
this.interval = interval;
this.sequenceName = request.getSequenceName();
- this.previousSegmentId = request.getPreviousSegmentId();
+ // Even if the previousSegmentId is set, disregard it when skipping
lineage check for streaming ingestion
+ this.previousSegmentId = skipSegmentLineageCheck ? null :
request.getPreviousSegmentId();
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId,
skipSegmentLineageCheck);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]