This is an automated email from the ASF dual-hosted git repository.

GWphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d4eefcc538e fix: Fix race in LocalIntermediaryDataManager.addSegment  
(#19446)
d4eefcc538e is described below

commit d4eefcc538ebd4b403d50b17328760f205b69905
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Wed May 13 14:32:56 2026 +0530

    fix: Fix race in LocalIntermediaryDataManager.addSegment  (#19446)
    
    * Fix race in LocalIntermediaryDataManager.addSegment using 
ConcurrentHashMap + AtomicInteger (#19443)
    
    * Add concurrency unit test for LocalIntermediaryDataManager.addSegment
---
 .../shuffle/LocalIntermediaryDataManager.java      |  24 +--
 ...ocalIntermediaryDataManagerConcurrencyTest.java | 178 +++++++++++++++++++++
 2 files changed, 186 insertions(+), 16 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index c510cd14b4d..1974b096be2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
 import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
 import com.google.inject.Inject;
@@ -59,9 +58,7 @@ import org.joda.time.Period;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -72,8 +69,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * In native parallel indexing, this class store segment files of phase 1 
tasks in local storage of middleManagers (or indexer)
@@ -102,8 +99,8 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
   // the supervisor.
   private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = 
new ConcurrentHashMap<>();
 
-  // supervisorTaskId -> cyclic iterator of storage locations
-  private final Map<String, Iterator<StorageLocation>> locationIterators = new 
HashMap<>();
+  // supervisorTaskId -> atomic round-robin cursor into shuffleDataLocations
+  private final ConcurrentHashMap<String, AtomicInteger> locationCursors = new 
ConcurrentHashMap<>();
 
   // The overlord is supposed to send a cleanup request as soon as the 
supervisorTask is finished in parallel indexing,
   // but middleManager or indexer could miss the request. This executor is to 
automatically clean up unused intermediary
@@ -277,16 +274,9 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
   public DataSegment addSegment(String supervisorTaskId, String subTaskId, 
DataSegment segment, File segmentDir)
       throws IOException
   {
-    // Get or create the location iterator for supervisorTask.
-    final Iterator<StorageLocation> iterator = 
locationIterators.computeIfAbsent(
+    final AtomicInteger cursor = locationCursors.computeIfAbsent(
         supervisorTaskId,
-        k -> {
-          final Iterator<StorageLocation> cyclicIterator = 
Iterators.cycle(shuffleDataLocations);
-          // Random start of the iterator
-          final int random = 
ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
-          IntStream.range(0, random).forEach(i -> cyclicIterator.next());
-          return cyclicIterator;
-        }
+        k -> new 
AtomicInteger(ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()))
     );
 
     // Create a zipped segment in a temp directory.
@@ -326,7 +316,9 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
 
       // Try copying the zipped segment to one of storage locations
       for (int i = 0; i < shuffleDataLocations.size(); i++) {
-        final StorageLocation location = iterator.next();
+        final StorageLocation location = shuffleDataLocations.get(
+            Math.floorMod(cursor.getAndIncrement(), 
shuffleDataLocations.size())
+        );
         final String partitionFilePath = getPartitionFilePath(
             supervisorTaskId,
             subTaskId,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
new file mode 100644
index 00000000000..142887d7837
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerConcurrencyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.ShardSpecLookup;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Verifies that {@link LocalIntermediaryDataManager#addSegment} is safe under 
concurrent
+ * invocation by multiple appender threads sharing the same supervisorTaskId.
+ *
+ * Without the thread-safe cursor introduced for issue #19443, this test would 
surface
+ * either a {@link java.util.ConcurrentModificationException} on the location 
map or a
+ * {@link java.util.NoSuchElementException} on the underlying iterator.
+ */
+public class LocalIntermediaryDataManagerConcurrencyTest
+{
+  private static final int LOCATION_COUNT = 4;
+  private static final long LOCATION_CAPACITY_BYTES = 1_073_741_824L; // 1 
GiB; ample so reservation never fails
+  private static final int THREADS = 16;
+  private static final int CALLS_PER_THREAD = 200;
+  private static final String SUPERVISOR_TASK_ID = "supervisorTaskId";
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  private LocalIntermediaryDataManager intermediaryDataManager;
+  private File sharedSegmentDir;
+
+  @Before
+  public void setUp() throws IOException
+  {
+    final WorkerConfig workerConfig = new WorkerConfig();
+    final ImmutableList.Builder<StorageLocationConfig> locations = 
ImmutableList.builder();
+    for (int i = 0; i < LOCATION_COUNT; i++) {
+      locations.add(new StorageLocationConfig(tempDir.newFolder("loc_" + i), 
LOCATION_CAPACITY_BYTES, null));
+    }
+    final TaskConfig taskConfig = new TaskConfigBuilder()
+        .setShuffleDataLocations(locations.build())
+        .build();
+    final OverlordClient overlordClient = new NoopOverlordClient();
+    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, 
taskConfig, overlordClient);
+    intermediaryDataManager.start();
+    // Pre-built shared input dir keeps per-call work small so the race window 
dominates wall time.
+    sharedSegmentDir = tempDir.newFolder("shared_input");
+    FileUtils.write(new File(sharedSegmentDir, "data.txt"), "x", 
StandardCharsets.UTF_8);
+    FileUtils.writeByteArrayToFile(new File(sharedSegmentDir, "version.bin"), 
Ints.toByteArray(9));
+  }
+
+  @After
+  public void tearDown()
+  {
+    intermediaryDataManager.stop();
+  }
+
+  @Test(timeout = 90_000)
+  public void testConcurrentAddSegmentSharedSupervisorIsThreadSafe() throws 
Exception
+  {
+    final Interval interval = Intervals.of("2018/2019");
+    final ExecutorService executor = Execs.multiThreaded(THREADS, 
"local-intermediary-data-manager-concurrency-test-%d");
+    final CountDownLatch startGate = new CountDownLatch(1);
+
+    final List<Future<Void>> futures = new ArrayList<>(THREADS);
+    for (int t = 0; t < THREADS; t++) {
+      final int threadId = t;
+      futures.add(executor.submit(() -> {
+        startGate.await();
+        for (int call = 0; call < CALLS_PER_THREAD; call++) {
+          final String subTaskId = "thread_" + threadId + "_call_" + call;
+          final DataSegment segment = newSegment(interval, threadId * 
CALLS_PER_THREAD + call);
+          intermediaryDataManager.addSegment(SUPERVISOR_TASK_ID, subTaskId, 
segment, sharedSegmentDir);
+        }
+        return null;
+      }));
+    }
+
+    startGate.countDown();
+
+    // Collect results; any exception inside a worker (CME / 
NoSuchElementException)
+    // surfaces here as ExecutionException and fails the test.
+    for (Future<Void> f : futures) {
+      f.get(60, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  private DataSegment newSegment(Interval interval, int bucketId)
+  {
+    return new DataSegment(
+        "dataSource",
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        new TestShardSpec(bucketId),
+        9,
+        10
+    );
+  }
+
+  private static class TestShardSpec implements 
BucketNumberedShardSpec<BuildingShardSpec<ShardSpec>>
+  {
+    private final int bucketId;
+
+    private TestShardSpec(int bucketId)
+    {
+      this.bucketId = bucketId;
+    }
+
+    @Override
+    public int getBucketId()
+    {
+      return bucketId;
+    }
+
+    @Override
+    public BuildingShardSpec<ShardSpec> convert(int partitionId)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to