This is an automated email from the ASF dual-hosted git repository.
GWphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d4eefcc538e fix: Fix race in LocalIntermediaryDataManager.addSegment
(#19446)
d4eefcc538e is described below
commit d4eefcc538ebd4b403d50b17328760f205b69905
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Wed May 13 14:32:56 2026 +0530
fix: Fix race in LocalIntermediaryDataManager.addSegment (#19446)
* Fix race in LocalIntermediaryDataManager.addSegment using
ConcurrentHashMap + AtomicInteger (#19443)
* Add concurrency unit test for LocalIntermediaryDataManager.addSegment
---
.../shuffle/LocalIntermediaryDataManager.java | 24 +--
...ocalIntermediaryDataManagerConcurrencyTest.java | 178 +++++++++++++++++++++
2 files changed, 186 insertions(+), 16 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index c510cd14b4d..1974b096be2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
@@ -59,9 +58,7 @@ import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -72,8 +69,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* In native parallel indexing, this class store segment files of phase 1
tasks in local storage of middleManagers (or indexer)
@@ -102,8 +99,8 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
// the supervisor.
private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes =
new ConcurrentHashMap<>();
- // supervisorTaskId -> cyclic iterator of storage locations
- private final Map<String, Iterator<StorageLocation>> locationIterators = new
HashMap<>();
+ // supervisorTaskId -> atomic round-robin cursor into shuffleDataLocations
+ private final ConcurrentHashMap<String, AtomicInteger> locationCursors = new
ConcurrentHashMap<>();
// The overlord is supposed to send a cleanup request as soon as the
supervisorTask is finished in parallel indexing,
// but middleManager or indexer could miss the request. This executor is to
automatically clean up unused intermediary
@@ -277,16 +274,9 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
public DataSegment addSegment(String supervisorTaskId, String subTaskId,
DataSegment segment, File segmentDir)
throws IOException
{
- // Get or create the location iterator for supervisorTask.
- final Iterator<StorageLocation> iterator =
locationIterators.computeIfAbsent(
+ final AtomicInteger cursor = locationCursors.computeIfAbsent(
supervisorTaskId,
- k -> {
- final Iterator<StorageLocation> cyclicIterator =
Iterators.cycle(shuffleDataLocations);
- // Random start of the iterator
- final int random =
ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
- IntStream.range(0, random).forEach(i -> cyclicIterator.next());
- return cyclicIterator;
- }
+ k -> new
AtomicInteger(ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()))
);
// Create a zipped segment in a temp directory.
@@ -326,7 +316,9 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
// Try copying the zipped segment to one of storage locations
for (int i = 0; i < shuffleDataLocations.size(); i++) {
- final StorageLocation location = iterator.next();
+ final StorageLocation location = shuffleDataLocations.get(
+ Math.floorMod(cursor.getAndIncrement(),
shuffleDataLocations.size())
+ );
final String partitionFilePath = getPartitionFilePath(
supervisorTaskId,
subTaskId,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
new file mode 100644
index 00000000000..142887d7837
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.ShardSpecLookup;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Verifies that {@link LocalIntermediaryDataManager#addSegment} is safe under
concurrent
+ * invocation by multiple appender threads sharing the same supervisorTaskId.
+ *
+ * Without the thread-safe cursor introduced for issue #19443, this test would
surface
+ * either a {@link java.util.ConcurrentModificationException} on the location
map or a
+ * {@link java.util.NoSuchElementException} on the underlying iterator.
+ */
+public class LocalIntermediaryDataManagerConcurrencyTest
+{
+ private static final int LOCATION_COUNT = 4;
+ private static final long LOCATION_CAPACITY_BYTES = 1_073_741_824L; // 1
GiB; ample so reservation never fails
+ private static final int THREADS = 16;
+ private static final int CALLS_PER_THREAD = 200;
+ private static final String SUPERVISOR_TASK_ID = "supervisorTaskId";
+
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ private LocalIntermediaryDataManager intermediaryDataManager;
+ private File sharedSegmentDir;
+
+ @Before
+ public void setUp() throws IOException
+ {
+ final WorkerConfig workerConfig = new WorkerConfig();
+ final ImmutableList.Builder<StorageLocationConfig> locations =
ImmutableList.builder();
+ for (int i = 0; i < LOCATION_COUNT; i++) {
+ locations.add(new StorageLocationConfig(tempDir.newFolder("loc_" + i),
LOCATION_CAPACITY_BYTES, null));
+ }
+ final TaskConfig taskConfig = new TaskConfigBuilder()
+ .setShuffleDataLocations(locations.build())
+ .build();
+ final OverlordClient overlordClient = new NoopOverlordClient();
+ intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig,
taskConfig, overlordClient);
+ intermediaryDataManager.start();
+ // Pre-built shared input dir keeps per-call work small so the race window
dominates wall time.
+ sharedSegmentDir = tempDir.newFolder("shared_input");
+ FileUtils.write(new File(sharedSegmentDir, "data.txt"), "x",
StandardCharsets.UTF_8);
+ FileUtils.writeByteArrayToFile(new File(sharedSegmentDir, "version.bin"),
Ints.toByteArray(9));
+ }
+
+ @After
+ public void tearDown()
+ {
+ intermediaryDataManager.stop();
+ }
+
+ @Test(timeout = 90_000)
+ public void testConcurrentAddSegmentSharedSupervisorIsThreadSafe() throws
Exception
+ {
+ final Interval interval = Intervals.of("2018/2019");
+ final ExecutorService executor = Execs.multiThreaded(THREADS,
"local-intermediary-data-manager-concurrency-test-%d");
+ final CountDownLatch startGate = new CountDownLatch(1);
+
+ final List<Future<Void>> futures = new ArrayList<>(THREADS);
+ for (int t = 0; t < THREADS; t++) {
+ final int threadId = t;
+ futures.add(executor.submit(() -> {
+ startGate.await();
+ for (int call = 0; call < CALLS_PER_THREAD; call++) {
+ final String subTaskId = "thread_" + threadId + "_call_" + call;
+ final DataSegment segment = newSegment(interval, threadId *
CALLS_PER_THREAD + call);
+ intermediaryDataManager.addSegment(SUPERVISOR_TASK_ID, subTaskId,
segment, sharedSegmentDir);
+ }
+ return null;
+ }));
+ }
+
+ startGate.countDown();
+
+ // Collect results; any exception inside a worker (CME /
NoSuchElementException)
+ // surfaces here as ExecutionException and fails the test.
+ for (Future<Void> f : futures) {
+ f.get(60, TimeUnit.SECONDS);
+ }
+ executor.shutdown();
+ Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ private DataSegment newSegment(Interval interval, int bucketId)
+ {
+ return new DataSegment(
+ "dataSource",
+ interval,
+ "version",
+ null,
+ null,
+ null,
+ new TestShardSpec(bucketId),
+ 9,
+ 10
+ );
+ }
+
+ private static class TestShardSpec implements
BucketNumberedShardSpec<BuildingShardSpec<ShardSpec>>
+ {
+ private final int bucketId;
+
+ private TestShardSpec(int bucketId)
+ {
+ this.bucketId = bucketId;
+ }
+
+ @Override
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @Override
+ public BuildingShardSpec<ShardSpec> convert(int partitionId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]