This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 54d0e482dc5 Consolidate RetrieveSegmentsToReplaceAction into 
RetrieveUsedSegmentsAction (#15699)
54d0e482dc5 is described below

commit 54d0e482dc560f42194c613d1e1bcd5310f281c2
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jan 29 19:18:43 2024 +0530

    Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction 
(#15699)
    
    Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  15 +-
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 139 +++++++++++++-
 .../druid/msq/test/MSQTestTaskActionClient.java    |  24 ---
 .../ActionBasedUsedSegmentChecker.java             |   3 +-
 .../actions/RetrieveSegmentsToReplaceAction.java   | 202 ---------------------
 .../common/actions/RetrieveUsedSegmentsAction.java |  96 +++++++++-
 .../druid/indexing/common/actions/TaskAction.java  |   1 -
 .../druid/indexing/common/config/TaskConfig.java   |  27 +--
 .../common/task/AbstractBatchIndexTask.java        |   3 +-
 .../druid/indexing/common/task/CompactionTask.java |   7 +-
 .../task/batch/parallel/TombstoneHelper.java       |   5 +-
 .../druid/indexing/input/DruidInputSource.java     |   9 +-
 .../ActionBasedUsedSegmentCheckerTest.java         |   7 +-
 .../actions/RetrieveSegmentsActionsTest.java       |   3 +-
 .../RetrieveUsedSegmentsActionSerdeTest.java       |   4 +-
 .../indexing/common/config/TaskConfigBuilder.java  |  16 +-
 .../concurrent/ConcurrentReplaceAndAppendTest.java |   3 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   1 -
 18 files changed, 265 insertions(+), 300 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9a1dd089cfc..f31f66c5ef3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
-import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
@@ -1233,10 +1233,15 @@ public class ControllerImpl implements Controller
       // any segment created after the lock was acquired for its interval will 
not be considered.
       final Collection<DataSegment> publishedUsedSegments;
       try {
-        publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveSegmentsToReplaceAction(
-            dataSource,
-            intervals
-        ));
+        // Additional check as the task action does not accept empty intervals
+        if (intervals.isEmpty()) {
+          publishedUsedSegments = Collections.emptySet();
+        } else {
+          publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveUsedSegmentsAction(
+              dataSource,
+              intervals
+          ));
+        }
       }
       catch (IOException e) {
         throw new MSQException(e, UnknownFault.forException(e));
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index f8100dc8a8f..ea7adc866ee 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -98,6 +99,28 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doCallRealMethod()
+           .doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               EasyMock.eq(ImmutableList.of(Intervals.ETERNITY))
+           ));
+
     testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
                              + "SELECT __time, m1 "
                              + "FROM foo "
@@ -418,6 +441,28 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doCallRealMethod()
+           .doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               EasyMock.eq(ImmutableList.of(Intervals.ETERNITY))
+           ));
+
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE ALL "
                              + "SELECT __time, m1 "
@@ -479,6 +524,20 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment0))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01")))
+           ));
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01' AND __time < TIMESTAMP '2000-03-01' "
                              + "SELECT __time, m1 "
@@ -538,6 +597,28 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2002-01-01")))
+           ));
+
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01' AND __time < TIMESTAMP '2002-01-01' "
                              + "SELECT __time, m1 "
@@ -625,6 +706,19 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    final DataSegment existingDataSegment = DataSegment.builder()
+                                                       .dataSource("foo")
+                                                       
.interval(Intervals.of("2000-01-01/2000-01-04"))
+                                                       
.version(MSQTestTaskActionClient.VERSION)
+                                                       .size(1)
+                                                       .build();
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01")))
+           ));
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01' AND __time < TIMESTAMP '2000-03-01'"
                              + "SELECT __time, m1 "
@@ -659,6 +753,26 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("m1", ColumnType.FLOAT)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002")))
+           ));
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01' AND __time < TIMESTAMP '2002-01-01'"
                              + "SELECT __time, m1 "
@@ -939,6 +1053,26 @@ public class MSQReplaceTest extends MSQTestBase
                                             .add("d", ColumnType.STRING)
                                             .build();
 
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo"),
+               EasyMock.eq(ImmutableList.of(Intervals.of("1999/2002")))
+           ));
+
     testIngestQuery().setSql(" REPLACE INTO foo "
                              + "OVERWRITE WHERE __time >= TIMESTAMP 
'1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'"
                              + "SELECT __time, d "
@@ -1003,7 +1137,10 @@ public class MSQReplaceTest extends MSQTestBase
 
     Mockito.doReturn(ImmutableSet.of(existingDataSegment))
            .when(testTaskActionClient)
-           .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
+           .submit(new RetrieveUsedSegmentsAction(
+               EasyMock.eq("foo1"),
+               EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002")))
+           ));
 
     List<Object[]> expectedResults;
     if (NullHandling.sqlCompatible()) {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 5192aafccdc..fd0452cce7c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -21,13 +21,10 @@ package org.apache.druid.msq.test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
-import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
@@ -47,8 +44,6 @@ import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,10 +55,6 @@ public class MSQTestTaskActionClient implements 
TaskActionClient
   public static final String VERSION = "test";
   private final ObjectMapper mapper;
   private final ConcurrentHashMap<SegmentId, AtomicInteger> 
segmentIdPartitionIdMap = new ConcurrentHashMap<>();
-  private final Map<String, List<Interval>> usedIntervals = ImmutableMap.of(
-      "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"), 
Intervals.of("2000-01-01/2000-01-04")),
-      "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
-  );
   private final Set<DataSegment> publishedSegments = new HashSet<>();
   private final Injector injector;
 
@@ -115,21 +106,6 @@ public class MSQTestTaskActionClient implements 
TaskActionClient
       ));
     } else if (taskAction instanceof RetrieveUsedSegmentsAction) {
       String dataSource = ((RetrieveUsedSegmentsAction) 
taskAction).getDataSource();
-      if (!usedIntervals.containsKey(dataSource)) {
-        return (RetType) ImmutableSet.of();
-      } else {
-        return (RetType) usedIntervals.get(dataSource)
-                                      .stream()
-                                      .map(interval -> DataSegment.builder()
-                                                                 
.dataSource(dataSource)
-                                                                 
.interval(interval)
-                                                                 
.version(VERSION)
-                                                                 .size(1)
-                                                                 .build()
-                                     ).collect(Collectors.toSet());
-      }
-    } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
-      String dataSource = ((RetrieveSegmentsToReplaceAction) 
taskAction).getDataSource();
       return (RetType) 
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
                                .getSegments()
                                .stream()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
index 984058895e3..3a33bc80d68 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.appenderator;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
@@ -66,7 +65,7 @@ public class ActionBasedUsedSegmentChecker implements 
UsedSegmentChecker
       );
 
       final Collection<DataSegment> usedSegmentsForIntervals = taskActionClient
-          .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, 
Segments.ONLY_VISIBLE));
+          .submit(new RetrieveUsedSegmentsAction(dataSource, intervals));
 
       for (DataSegment segment : usedSegmentsForIntervals) {
         if (segmentIdsInDataSource.contains(segment.getId())) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
deleted file mode 100644
index 7fec3369a82..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.actions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.druid.indexing.common.task.Task;
-import 
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
-import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.ReplaceTaskLock;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.Partitions;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * This action exists in addition to retrieveUsedSegmentsAction because that 
action suffers
- * from a race condition described by the following sequence of events:
- *
- * -Segments S1, S2, S3 exist
- * -Compact acquires a replace lock
- * -A concurrent appending job publishes a segment S4 which needs to be 
upgraded to the replace lock's version
- * -Compact task processes S1-S4 to create new segments
- * -Compact task publishes new segments and carries S4 forward to the new 
version
- *
- * This can lead to the data in S4 being duplicated
- *
- * This TaskAction returns a collection of segments which have data within the 
specified interval and are marked as
- * used, and have been created before a REPLACE lock, if any, was acquired.
- * This ensures that a consistent set of segments is returned each time this 
action is called
- */
-public class RetrieveSegmentsToReplaceAction implements 
TaskAction<Collection<DataSegment>>
-{
-  private static final Logger log = new 
Logger(RetrieveSegmentsToReplaceAction.class);
-
-  private final String dataSource;
-
-  private final List<Interval> intervals;
-
-  @JsonCreator
-  public RetrieveSegmentsToReplaceAction(
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("intervals") List<Interval> intervals
-  )
-  {
-    this.dataSource = dataSource;
-    this.intervals = intervals;
-  }
-
-  @JsonProperty
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public List<Interval> getIntervals()
-  {
-    return intervals;
-  }
-
-  @Override
-  public TypeReference<Collection<DataSegment>> getReturnTypeReference()
-  {
-    return new TypeReference<Collection<DataSegment>>() {};
-  }
-
-  @Override
-  public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
-  {
-    // The DruidInputSource can be used to read from one datasource and write 
to another.
-    // In such a case, the race condition described in the class-level docs 
cannot occur,
-    // and the action can simply fetch all visible segments for the datasource 
and interval
-    if (!task.getDataSource().equals(dataSource)) {
-      return retrieveAllVisibleSegments(toolbox);
-    }
-
-    final String supervisorId;
-    if (task instanceof AbstractBatchSubtask) {
-      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
-    } else {
-      supervisorId = task.getId();
-    }
-
-    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
-        .getTaskLockbox()
-        .getAllReplaceLocksForDatasource(task.getDataSource())
-        .stream()
-        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
-        .collect(Collectors.toSet());
-
-    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
-    if (replaceLocksForTask.isEmpty()) {
-      return retrieveAllVisibleSegments(toolbox);
-    }
-
-    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
-    for (Pair<DataSegment, String> segmentAndCreatedDate :
-        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
-      final DataSegment segment = segmentAndCreatedDate.lhs;
-      final String created = segmentAndCreatedDate.rhs;
-      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
-                                 .computeIfAbsent(created, c -> new 
HashSet<>())
-                                 .add(segment);
-    }
-
-    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
-    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
-      final Interval segmentInterval = entry.getKey();
-      String lockVersion = null;
-      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
-        if (replaceLock.getInterval().contains(segmentInterval)) {
-          lockVersion = replaceLock.getVersion();
-        }
-      }
-      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
-      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
-        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
-          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
-        } else {
-          for (DataSegment segment : createdAndSegments.getValue()) {
-            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",
-                     segment.getId(), createdAndSegments.getKey(), 
lockVersion);
-          }
-        }
-      }
-    }
-
-    return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
-                          
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
-  }
-
-  private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox 
toolbox)
-  {
-    return toolbox.getIndexerMetadataStorageCoordinator()
-                  .retrieveUsedSegmentsForIntervals(dataSource, intervals, 
Segments.ONLY_VISIBLE);
-  }
-
-  @Override
-  public boolean isAudited()
-  {
-    return false;
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;
-    return Objects.equals(dataSource, that.dataSource) && 
Objects.equals(intervals, that.intervals);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(dataSource, intervals);
-  }
-  @Override
-  public String toString()
-  {
-    return "RetrieveSegmentsToReplaceAction{" +
-           "dataSource='" + dataSource + '\'' +
-           ", intervals=" + intervals +
-           '}';
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
index fab8c484689..29986eeba55 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
@@ -26,29 +26,47 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.task.Task;
+import 
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
 import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This TaskAction returns a collection of segments which have data within the 
specified intervals and are marked as
  * used.
+ * If the task holds REPLACE locks and is writing back to the same datasource,
+ * only segments that were created before the REPLACE lock was acquired are 
returned for an interval.
+ * This ensures that the input set of segments for this replace task remains 
consistent
+ * even when new data is appended by other concurrent tasks.
  *
  * The order of segments within the returned collection is unspecified, but 
each segment is guaranteed to appear in
  * the collection only once.
  *
- * @implNote This action doesn't produce a {@link java.util.Set} because it's 
implemented via {@link
+ * @implNote This action doesn't produce a {@link Set} because it's 
implemented via {@link
  * 
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
 which returns
- * a collection. Producing a {@link java.util.Set} would require an 
unnecessary copy of segments collection.
+ * a collection. Producing a {@link Set} would require an unnecessary copy of 
segments collection.
  */
 public class RetrieveUsedSegmentsAction implements 
TaskAction<Collection<DataSegment>>
 {
+  private static final Logger log = new 
Logger(RetrieveUsedSegmentsAction.class);
+
   @JsonIgnore
   private final String dataSource;
 
@@ -87,6 +105,11 @@ public class RetrieveUsedSegmentsAction implements 
TaskAction<Collection<DataSeg
     this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
   }
 
+  public RetrieveUsedSegmentsAction(String dataSource, Collection<Interval> 
intervals)
+  {
+    this(dataSource, null, intervals, Segments.ONLY_VISIBLE);
+  }
+
   @JsonProperty
   public String getDataSource()
   {
@@ -113,6 +136,75 @@ public class RetrieveUsedSegmentsAction implements 
TaskAction<Collection<DataSeg
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // When fetching segments for a datasource other than the one this task is 
writing to,
+    // just return all segments with the needed visibility.
+    // This is because we can't ensure that the set of returned segments is 
consistent throughout the task's lifecycle
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String createdDate = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(createdDate, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();
+          break;
+        }
+      }
+      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
+      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
+        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+        } else {
+          for (DataSegment segment : createdAndSegments.getValue()) {
+            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",
+                     segment.getId(), createdAndSegments.getKey(), 
lockVersion);
+          }
+        }
+      }
+    }
+
+    if (visibility == Segments.ONLY_VISIBLE) {
+      return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+                            
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+    } else {
+      return allSegmentsToBeReplaced;
+    }
+  }
+
+  private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox 
toolbox)
   {
     return toolbox.getIndexerMetadataStorageCoordinator()
                   .retrieveUsedSegmentsForIntervals(dataSource, intervals, 
visibility);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index e251626f869..171d53b9cdd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Future;
     @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = 
SegmentTransactionalInsertAction.class),
     @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = 
SegmentTransactionalAppendAction.class),
     @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = 
SegmentTransactionalReplaceAction.class),
-    @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = 
RetrieveSegmentsToReplaceAction.class),
     // Type name doesn't correspond to the name of the class for backward 
compatibility.
     @JsonSubTypes.Type(name = "segmentListUsed", value = 
RetrieveUsedSegmentsAction.class),
     // Type name doesn't correspond to the name of the class for backward 
compatibility.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 3352735b9e0..db48d6f07f7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -79,7 +79,6 @@ public class TaskConfig
   private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new 
Period("PT5M");
   private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
   private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
-  private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = 
false;
 
   @JsonProperty
   private final String baseDir;
@@ -126,9 +125,6 @@ public class TaskConfig
   @JsonProperty
   private final long tmpStorageBytesPerTask;
 
-  @JsonProperty
-  private final boolean enableConcurrentAppendAndReplace;
-
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
@@ -146,8 +142,7 @@ public class TaskConfig
       @JsonProperty("batchProcessingMode") String batchProcessingMode,
       @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
       @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
-      @JsonProperty("tmpStorageBytesPerTask") @Nullable Long 
tmpStorageBytesPerTask,
-      @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean 
enableConcurrentAppendAndReplace
+      @JsonProperty("tmpStorageBytesPerTask") @Nullable Long 
tmpStorageBytesPerTask
   )
   {
     this.baseDir = Configs.valueOrDefault(baseDir, 
System.getProperty("java.io.tmpdir"));
@@ -198,10 +193,6 @@ public class TaskConfig
 
     this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, 
DEFAULT_STORE_EMPTY_COLUMNS);
     this.tmpStorageBytesPerTask = 
Configs.valueOrDefault(tmpStorageBytesPerTask, 
DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
-    this.enableConcurrentAppendAndReplace = Configs.valueOrDefault(
-        enableConcurrentAppendAndReplace,
-        DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE
-    );
   }
 
   private TaskConfig(
@@ -219,8 +210,7 @@ public class TaskConfig
       BatchProcessingMode batchProcessingMode,
       boolean storeEmptyColumns,
       boolean encapsulatedTask,
-      long tmpStorageBytesPerTask,
-      boolean enableConcurrentAppendAndReplace
+      long tmpStorageBytesPerTask
   )
   {
     this.baseDir = baseDir;
@@ -238,7 +228,6 @@ public class TaskConfig
     this.storeEmptyColumns = storeEmptyColumns;
     this.encapsulatedTask = encapsulatedTask;
     this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
-    this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace;
   }
 
   @JsonProperty
@@ -355,12 +344,6 @@ public class TaskConfig
     return tmpStorageBytesPerTask;
   }
 
-  @JsonProperty("enableConcurrentAppendAndReplace")
-  public boolean isConcurrentAppendAndReplaceEnabled()
-  {
-    return enableConcurrentAppendAndReplace;
-  }
-
   private String defaultDir(@Nullable String configParameter, final String 
defaultVal)
   {
     if (configParameter == null) {
@@ -387,8 +370,7 @@ public class TaskConfig
         batchProcessingMode,
         storeEmptyColumns,
         encapsulatedTask,
-        tmpStorageBytesPerTask,
-        enableConcurrentAppendAndReplace
+        tmpStorageBytesPerTask
     );
   }
 
@@ -409,8 +391,7 @@ public class TaskConfig
         batchProcessingMode,
         storeEmptyColumns,
         encapsulatedTask,
-        tmpStorageBytesPerTask,
-        enableConcurrentAppendAndReplace
+        tmpStorageBytesPerTask
     );
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 94110e167e3..4a76e688fb7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -48,7 +48,6 @@ import 
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededExcept
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.indexing.input.InputRowSchemas;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -655,7 +654,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   {
     return ImmutableList.copyOf(
         actionClient.submit(
-            new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, 
Segments.ONLY_VISIBLE)
+            new RetrieveUsedSegmentsAction(dataSource, intervalsToRead)
         )
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 1a0e5c971fd..6959de2809d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -60,7 +60,6 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.indexing.input.DruidInputSource;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -428,7 +427,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       throws IOException
   {
     return ImmutableList.copyOf(
-        taskActionClient.submit(new 
RetrieveUsedSegmentsAction(getDataSource(), null, intervals, 
Segments.ONLY_VISIBLE))
+        taskActionClient.submit(new 
RetrieveUsedSegmentsAction(getDataSource(), intervals))
     );
   }
 
@@ -1163,7 +1162,9 @@ public class CompactionTask extends AbstractBatchIndexTask
     List<DataSegment> findSegments(TaskActionClient actionClient) throws 
IOException
     {
       return new ArrayList<>(
-          actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, 
interval, null, Segments.ONLY_VISIBLE))
+          actionClient.submit(
+              new RetrieveUsedSegmentsAction(dataSource, 
ImmutableList.of(interval))
+          )
       );
     }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 6b3c684e794..15ba6788307 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -26,7 +26,6 @@ import 
org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -309,9 +308,7 @@ public class TombstoneHelper
       Collection<DataSegment> usedSegmentsInInputInterval =
           taskActionClient.submit(new RetrieveUsedSegmentsAction(
               dataSource,
-              null,
-              condensedInputIntervals,
-              Segments.ONLY_VISIBLE
+              condensedInputIntervals
           ));
       for (DataSegment usedSegment : usedSegmentsInInputInterval) {
         for (Interval condensedInputInterval : condensedInputIntervals) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 85617728e5e..890a7c313fa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -49,7 +49,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import 
org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
-import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.CloseableIterators;
@@ -546,7 +546,7 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
     Preconditions.checkNotNull(interval);
 
     final Collection<DataSegment> usedSegments;
-    if (toolbox == null || 
!toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) {
+    if (toolbox == null) {
       usedSegments = FutureUtils.getUnchecked(
           coordinatorClient.fetchUsedSegments(dataSource, 
Collections.singletonList(interval)),
           true
@@ -554,7 +554,10 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
     } else {
       try {
         usedSegments = toolbox.getTaskActionClient()
-                              .submit(new 
RetrieveSegmentsToReplaceAction(dataSource, 
Collections.singletonList(interval)));
+                              .submit(new RetrieveUsedSegmentsAction(
+                                  dataSource,
+                                  Collections.singletonList(interval)
+                              ));
       }
       catch (IOException e) {
         LOG.error(e, "Error retrieving the used segments for interval[%s].", 
interval);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
index 35a28be6cbd..c339a103b2d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
@@ -44,7 +43,7 @@ public class ActionBasedUsedSegmentCheckerTest
     final TaskActionClient taskActionClient = 
EasyMock.createMock(TaskActionClient.class);
     EasyMock.expect(
         taskActionClient.submit(
-            new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), 
null, Segments.ONLY_VISIBLE)
+            new RetrieveUsedSegmentsAction("bar", 
ImmutableList.of(Intervals.of("2002/P1D")))
         )
     ).andReturn(
         ImmutableList.of(
@@ -68,9 +67,7 @@ public class ActionBasedUsedSegmentCheckerTest
         taskActionClient.submit(
             new RetrieveUsedSegmentsAction(
                 "foo",
-                null,
-                ImmutableList.of(Intervals.of("2000/P1D"), 
Intervals.of("2001/P1D")),
-                Segments.ONLY_VISIBLE
+                ImmutableList.of(Intervals.of("2000/P1D"), 
Intervals.of("2001/P1D"))
             )
         )
     ).andReturn(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 0fbfb1733a8..2dee594aad6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
@@ -98,7 +97,7 @@ public class RetrieveSegmentsActionsTest
   public void testRetrieveUsedSegmentsAction()
   {
     final RetrieveUsedSegmentsAction action =
-        new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, 
Segments.ONLY_VISIBLE);
+        new RetrieveUsedSegmentsAction(task.getDataSource(), 
ImmutableList.of(INTERVAL));
     final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, 
actionTestKit.getTaskActionToolbox()));
     Assert.assertEquals(expectedUsedSegments, resultSegments);
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
index 0876092cac1..99675fd57bb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
@@ -56,9 +56,7 @@ public class RetrieveUsedSegmentsActionSerdeTest
     List<Interval> intervals = ImmutableList.of(Intervals.of("2014/2015"), 
Intervals.of("2016/2017"));
     RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction(
         "dataSource",
-        null,
-        intervals,
-        Segments.ONLY_VISIBLE
+        intervals
     );
 
     RetrieveUsedSegmentsAction actual =
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
index 8b488fff809..af920ebbeb7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
@@ -41,7 +41,6 @@ public class TaskConfigBuilder
   private Boolean storeEmptyColumns;
   private boolean enableTaskLevelLogPush;
   private Long tmpStorageBytesPerTask;
-  private Boolean enableConcurrentAppendAndReplace;
 
   public TaskConfigBuilder setBaseDir(String baseDir)
   {
@@ -133,18 +132,6 @@ public class TaskConfigBuilder
     return this;
   }
 
-  public TaskConfigBuilder enableConcurrentAppendAndReplace()
-  {
-    this.enableConcurrentAppendAndReplace = true;
-    return this;
-  }
-
-  public TaskConfigBuilder disableConcurrentAppendAndReplace()
-  {
-    this.enableConcurrentAppendAndReplace = false;
-    return this;
-  }
-
   public TaskConfig build()
   {
     return new TaskConfig(
@@ -162,8 +149,7 @@ public class TaskConfigBuilder
         batchProcessingMode,
         storeEmptyColumns,
         enableTaskLevelLogPush,
-        tmpStorageBytesPerTask,
-        enableConcurrentAppendAndReplace
+        tmpStorageBytesPerTask
     );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 7f83a8f0233..3fda953a454 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
-import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -955,7 +954,7 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     try {
       final TaskActionClient taskActionClient = 
taskActionClientFactory.create(task);
       Collection<DataSegment> allUsedSegments = taskActionClient.submit(
-          new RetrieveSegmentsToReplaceAction(
+          new RetrieveUsedSegmentsAction(
               WIKI,
               Collections.singletonList(interval)
           )
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index b0b347e25a8..002c70262cb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -616,7 +616,6 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
         .setDefaultRowFlushBoundary(50000)
         
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
         .setTmpStorageBytesPerTask(-1L)
-        .enableConcurrentAppendAndReplace()
         .build();
 
     return new TaskToolboxFactory(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to