This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 54d0e482dc5 Consolidate RetrieveSegmentsToReplaceAction into
RetrieveUsedSegmentsAction (#15699)
54d0e482dc5 is described below
commit 54d0e482dc560f42194c613d1e1bcd5310f281c2
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jan 29 19:18:43 2024 +0530
Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction
(#15699)
Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 15 +-
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 139 +++++++++++++-
.../druid/msq/test/MSQTestTaskActionClient.java | 24 ---
.../ActionBasedUsedSegmentChecker.java | 3 +-
.../actions/RetrieveSegmentsToReplaceAction.java | 202 ---------------------
.../common/actions/RetrieveUsedSegmentsAction.java | 96 +++++++++-
.../druid/indexing/common/actions/TaskAction.java | 1 -
.../druid/indexing/common/config/TaskConfig.java | 27 +--
.../common/task/AbstractBatchIndexTask.java | 3 +-
.../druid/indexing/common/task/CompactionTask.java | 7 +-
.../task/batch/parallel/TombstoneHelper.java | 5 +-
.../druid/indexing/input/DruidInputSource.java | 9 +-
.../ActionBasedUsedSegmentCheckerTest.java | 7 +-
.../actions/RetrieveSegmentsActionsTest.java | 3 +-
.../RetrieveUsedSegmentsActionSerdeTest.java | 4 +-
.../indexing/common/config/TaskConfigBuilder.java | 16 +-
.../concurrent/ConcurrentReplaceAndAppendTest.java | 3 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 1 -
18 files changed, 265 insertions(+), 300 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9a1dd089cfc..f31f66c5ef3 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
-import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
@@ -1233,10 +1233,15 @@ public class ControllerImpl implements Controller
// any segment created after the lock was acquired for its interval will
not be considered.
final Collection<DataSegment> publishedUsedSegments;
try {
- publishedUsedSegments = context.taskActionClient().submit(new
RetrieveSegmentsToReplaceAction(
- dataSource,
- intervals
- ));
+ // Additional check as the task action does not accept empty intervals
+ if (intervals.isEmpty()) {
+ publishedUsedSegments = Collections.emptySet();
+ } else {
+ publishedUsedSegments = context.taskActionClient().submit(new
RetrieveUsedSegmentsAction(
+ dataSource,
+ intervals
+ ));
+ }
}
catch (IOException e) {
throw new MSQException(e, UnknownFault.forException(e));
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index f8100dc8a8f..ea7adc866ee 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -98,6 +99,28 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doCallRealMethod()
+ .doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+ EasyMock.eq(ImmutableList.of(Intervals.ETERNITY))
+ ));
+
testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+ "SELECT __time, m1 "
+ "FROM foo "
@@ -418,6 +441,28 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doCallRealMethod()
+ .doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+ EasyMock.eq(ImmutableList.of(Intervals.ETERNITY))
+ ));
+
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE ALL "
+ "SELECT __time, m1 "
@@ -479,6 +524,20 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment0))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01")))
+ ));
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01' AND __time < TIMESTAMP '2000-03-01' "
+ "SELECT __time, m1 "
@@ -538,6 +597,28 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2002-01-01")))
+ ));
+
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01' AND __time < TIMESTAMP '2002-01-01' "
+ "SELECT __time, m1 "
@@ -625,6 +706,19 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ final DataSegment existingDataSegment = DataSegment.builder()
+ .dataSource("foo")
+
.interval(Intervals.of("2000-01-01/2000-01-04"))
+
.version(MSQTestTaskActionClient.VERSION)
+ .size(1)
+ .build();
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+
EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01")))
+ ));
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01' AND __time < TIMESTAMP '2000-03-01'"
+ "SELECT __time, m1 "
@@ -659,6 +753,26 @@ public class MSQReplaceTest extends MSQTestBase
.add("m1", ColumnType.FLOAT)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+ EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002")))
+ ));
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01' AND __time < TIMESTAMP '2002-01-01'"
+ "SELECT __time, m1 "
@@ -939,6 +1053,26 @@ public class MSQReplaceTest extends MSQTestBase
.add("d", ColumnType.STRING)
.build();
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo"),
+ EasyMock.eq(ImmutableList.of(Intervals.of("1999/2002")))
+ ));
+
testIngestQuery().setSql(" REPLACE INTO foo "
+ "OVERWRITE WHERE __time >= TIMESTAMP
'1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'"
+ "SELECT __time, d "
@@ -1003,7 +1137,10 @@ public class MSQReplaceTest extends MSQTestBase
Mockito.doReturn(ImmutableSet.of(existingDataSegment))
.when(testTaskActionClient)
- .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
+ .submit(new RetrieveUsedSegmentsAction(
+ EasyMock.eq("foo1"),
+ EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002")))
+ ));
List<Object[]> expectedResults;
if (NullHandling.sqlCompatible()) {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 5192aafccdc..fd0452cce7c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -21,13 +21,10 @@ package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.LockListAction;
-import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
@@ -47,8 +44,6 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,10 +55,6 @@ public class MSQTestTaskActionClient implements
TaskActionClient
public static final String VERSION = "test";
private final ObjectMapper mapper;
private final ConcurrentHashMap<SegmentId, AtomicInteger>
segmentIdPartitionIdMap = new ConcurrentHashMap<>();
- private final Map<String, List<Interval>> usedIntervals = ImmutableMap.of(
- "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"),
Intervals.of("2000-01-01/2000-01-04")),
- "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
- );
private final Set<DataSegment> publishedSegments = new HashSet<>();
private final Injector injector;
@@ -115,21 +106,6 @@ public class MSQTestTaskActionClient implements
TaskActionClient
));
} else if (taskAction instanceof RetrieveUsedSegmentsAction) {
String dataSource = ((RetrieveUsedSegmentsAction)
taskAction).getDataSource();
- if (!usedIntervals.containsKey(dataSource)) {
- return (RetType) ImmutableSet.of();
- } else {
- return (RetType) usedIntervals.get(dataSource)
- .stream()
- .map(interval -> DataSegment.builder()
-
.dataSource(dataSource)
-
.interval(interval)
-
.version(VERSION)
- .size(1)
- .build()
- ).collect(Collectors.toSet());
- }
- } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
- String dataSource = ((RetrieveSegmentsToReplaceAction)
taskAction).getDataSource();
return (RetType)
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
.getSegments()
.stream()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
index 984058895e3..3a33bc80d68 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.appenderator;
import com.google.common.collect.Iterables;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
@@ -66,7 +65,7 @@ public class ActionBasedUsedSegmentChecker implements
UsedSegmentChecker
);
final Collection<DataSegment> usedSegmentsForIntervals = taskActionClient
- .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals,
Segments.ONLY_VISIBLE));
+ .submit(new RetrieveUsedSegmentsAction(dataSource, intervals));
for (DataSegment segment : usedSegmentsForIntervals) {
if (segmentIdsInDataSource.contains(segment.getId())) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
deleted file mode 100644
index 7fec3369a82..00000000000
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.actions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.druid.indexing.common.task.Task;
-import
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
-import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.ReplaceTaskLock;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.Partitions;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * This action exists in addition to retrieveUsedSegmentsAction because that
action suffers
- * from a race condition described by the following sequence of events:
- *
- * -Segments S1, S2, S3 exist
- * -Compact acquires a replace lock
- * -A concurrent appending job publishes a segment S4 which needs to be
upgraded to the replace lock's version
- * -Compact task processes S1-S4 to create new segments
- * -Compact task publishes new segments and carries S4 forward to the new
version
- *
- * This can lead to the data in S4 being duplicated
- *
- * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
- * used, and have been created before a REPLACE lock, if any, was acquired.
- * This ensures that a consistent set of segments is returned each time this
action is called
- */
-public class RetrieveSegmentsToReplaceAction implements
TaskAction<Collection<DataSegment>>
-{
- private static final Logger log = new
Logger(RetrieveSegmentsToReplaceAction.class);
-
- private final String dataSource;
-
- private final List<Interval> intervals;
-
- @JsonCreator
- public RetrieveSegmentsToReplaceAction(
- @JsonProperty("dataSource") String dataSource,
- @JsonProperty("intervals") List<Interval> intervals
- )
- {
- this.dataSource = dataSource;
- this.intervals = intervals;
- }
-
- @JsonProperty
- public String getDataSource()
- {
- return dataSource;
- }
-
- @JsonProperty
- public List<Interval> getIntervals()
- {
- return intervals;
- }
-
- @Override
- public TypeReference<Collection<DataSegment>> getReturnTypeReference()
- {
- return new TypeReference<Collection<DataSegment>>() {};
- }
-
- @Override
- public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
- {
- // The DruidInputSource can be used to read from one datasource and write
to another.
- // In such a case, the race condition described in the class-level docs
cannot occur,
- // and the action can simply fetch all visible segments for the datasource
and interval
- if (!task.getDataSource().equals(dataSource)) {
- return retrieveAllVisibleSegments(toolbox);
- }
-
- final String supervisorId;
- if (task instanceof AbstractBatchSubtask) {
- supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
- } else {
- supervisorId = task.getId();
- }
-
- final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
- .getTaskLockbox()
- .getAllReplaceLocksForDatasource(task.getDataSource())
- .stream()
- .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
- .collect(Collectors.toSet());
-
- // If there are no replace locks for the task, simply fetch all visible
segments for the interval
- if (replaceLocksForTask.isEmpty()) {
- return retrieveAllVisibleSegments(toolbox);
- }
-
- Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
- for (Pair<DataSegment, String> segmentAndCreatedDate :
-
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
- final DataSegment segment = segmentAndCreatedDate.lhs;
- final String created = segmentAndCreatedDate.rhs;
- intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
- .computeIfAbsent(created, c -> new
HashSet<>())
- .add(segment);
- }
-
- Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
- for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
- final Interval segmentInterval = entry.getKey();
- String lockVersion = null;
- for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
- if (replaceLock.getInterval().contains(segmentInterval)) {
- lockVersion = replaceLock.getVersion();
- }
- }
- final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
- for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
- if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
- allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
- } else {
- for (DataSegment segment : createdAndSegments.getValue()) {
- log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
- segment.getId(), createdAndSegments.getKey(),
lockVersion);
- }
- }
- }
- }
-
- return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
-
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
- }
-
- private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox
toolbox)
- {
- return toolbox.getIndexerMetadataStorageCoordinator()
- .retrieveUsedSegmentsForIntervals(dataSource, intervals,
Segments.ONLY_VISIBLE);
- }
-
- @Override
- public boolean isAudited()
- {
- return false;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;
- return Objects.equals(dataSource, that.dataSource) &&
Objects.equals(intervals, that.intervals);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(dataSource, intervals);
- }
- @Override
- public String toString()
- {
- return "RetrieveSegmentsToReplaceAction{" +
- "dataSource='" + dataSource + '\'' +
- ", intervals=" + intervals +
- '}';
- }
-}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
index fab8c484689..29986eeba55 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java
@@ -26,29 +26,47 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.task.Task;
+import
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* This TaskAction returns a collection of segments which have data within the
specified intervals and are marked as
* used.
+ * If the task holds REPLACE locks and is writing back to the same datasource,
+ * only segments that were created before the REPLACE lock was acquired are
returned for an interval.
+ * This ensures that the input set of segments for this replace task remains
consistent
+ * even when new data is appended by other concurrent tasks.
*
* The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
* the collection only once.
*
- * @implNote This action doesn't produce a {@link java.util.Set} because it's
implemented via {@link
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
*
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
- * a collection. Producing a {@link java.util.Set} would require an
unnecessary copy of segments collection.
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
*/
public class RetrieveUsedSegmentsAction implements
TaskAction<Collection<DataSegment>>
{
+ private static final Logger log = new
Logger(RetrieveUsedSegmentsAction.class);
+
@JsonIgnore
private final String dataSource;
@@ -87,6 +105,11 @@ public class RetrieveUsedSegmentsAction implements
TaskAction<Collection<DataSeg
this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
}
+ public RetrieveUsedSegmentsAction(String dataSource, Collection<Interval>
intervals)
+ {
+ this(dataSource, null, intervals, Segments.ONLY_VISIBLE);
+ }
+
@JsonProperty
public String getDataSource()
{
@@ -113,6 +136,75 @@ public class RetrieveUsedSegmentsAction implements
TaskAction<Collection<DataSeg
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // When fetching segments for a datasource other than the one this task is
writing to,
+ // just return all segments with the needed visibility.
+ // This is because we can't ensure that the set of returned segments is
consistent throughout the task's lifecycle
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String createdDate = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(createdDate, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
+ break;
+ }
+ }
+ final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
+ for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
+ if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+ allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+ } else {
+ for (DataSegment segment : createdAndSegments.getValue()) {
+ log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
+ segment.getId(), createdAndSegments.getKey(),
lockVersion);
+ }
+ }
+ }
+ }
+
+ if (visibility == Segments.ONLY_VISIBLE) {
+ return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ } else {
+ return allSegmentsToBeReplaced;
+ }
+ }
+
+ private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox
toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, intervals,
visibility);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index e251626f869..171d53b9cdd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value =
SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value =
SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value =
SegmentTransactionalReplaceAction.class),
- @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value =
RetrieveSegmentsToReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward
compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value =
RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward
compatibility.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 3352735b9e0..db48d6f07f7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -79,7 +79,6 @@ public class TaskConfig
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new
Period("PT5M");
private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
- private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE =
false;
@JsonProperty
private final String baseDir;
@@ -126,9 +125,6 @@ public class TaskConfig
@JsonProperty
private final long tmpStorageBytesPerTask;
- @JsonProperty
- private final boolean enableConcurrentAppendAndReplace;
-
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@@ -146,8 +142,7 @@ public class TaskConfig
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
- @JsonProperty("tmpStorageBytesPerTask") @Nullable Long
tmpStorageBytesPerTask,
- @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean
enableConcurrentAppendAndReplace
+ @JsonProperty("tmpStorageBytesPerTask") @Nullable Long
tmpStorageBytesPerTask
)
{
this.baseDir = Configs.valueOrDefault(baseDir,
System.getProperty("java.io.tmpdir"));
@@ -198,10 +193,6 @@ public class TaskConfig
this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns,
DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask =
Configs.valueOrDefault(tmpStorageBytesPerTask,
DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
- this.enableConcurrentAppendAndReplace = Configs.valueOrDefault(
- enableConcurrentAppendAndReplace,
- DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE
- );
}
private TaskConfig(
@@ -219,8 +210,7 @@ public class TaskConfig
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask,
- long tmpStorageBytesPerTask,
- boolean enableConcurrentAppendAndReplace
+ long tmpStorageBytesPerTask
)
{
this.baseDir = baseDir;
@@ -238,7 +228,6 @@ public class TaskConfig
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
- this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace;
}
@JsonProperty
@@ -355,12 +344,6 @@ public class TaskConfig
return tmpStorageBytesPerTask;
}
- @JsonProperty("enableConcurrentAppendAndReplace")
- public boolean isConcurrentAppendAndReplaceEnabled()
- {
- return enableConcurrentAppendAndReplace;
- }
-
private String defaultDir(@Nullable String configParameter, final String
defaultVal)
{
if (configParameter == null) {
@@ -387,8 +370,7 @@ public class TaskConfig
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
- tmpStorageBytesPerTask,
- enableConcurrentAppendAndReplace
+ tmpStorageBytesPerTask
);
}
@@ -409,8 +391,7 @@ public class TaskConfig
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
- tmpStorageBytesPerTask,
- enableConcurrentAppendAndReplace
+ tmpStorageBytesPerTask
);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 94110e167e3..4a76e688fb7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -48,7 +48,6 @@ import
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededExcept
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
@@ -655,7 +654,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
{
return ImmutableList.copyOf(
actionClient.submit(
- new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead,
Segments.ONLY_VISIBLE)
+ new RetrieveUsedSegmentsAction(dataSource, intervalsToRead)
)
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 1a0e5c971fd..6959de2809d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -60,7 +60,6 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -428,7 +427,7 @@ public class CompactionTask extends AbstractBatchIndexTask
throws IOException
{
return ImmutableList.copyOf(
- taskActionClient.submit(new
RetrieveUsedSegmentsAction(getDataSource(), null, intervals,
Segments.ONLY_VISIBLE))
+ taskActionClient.submit(new
RetrieveUsedSegmentsAction(getDataSource(), intervals))
);
}
@@ -1163,7 +1162,9 @@ public class CompactionTask extends AbstractBatchIndexTask
List<DataSegment> findSegments(TaskActionClient actionClient) throws
IOException
{
return new ArrayList<>(
- actionClient.submit(new RetrieveUsedSegmentsAction(dataSource,
interval, null, Segments.ONLY_VISIBLE))
+ actionClient.submit(
+ new RetrieveUsedSegmentsAction(dataSource,
ImmutableList.of(interval))
+ )
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 6b3c684e794..15ba6788307 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -26,7 +26,6 @@ import
org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
@@ -309,9 +308,7 @@ public class TombstoneHelper
Collection<DataSegment> usedSegmentsInInputInterval =
taskActionClient.submit(new RetrieveUsedSegmentsAction(
dataSource,
- null,
- condensedInputIntervals,
- Segments.ONLY_VISIBLE
+ condensedInputIntervals
));
for (DataSegment usedSegment : usedSegmentsInInputInterval) {
for (Interval condensedInputInterval : condensedInputIntervals) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 85617728e5e..890a7c313fa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -49,7 +49,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import
org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
-import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.CloseableIterators;
@@ -546,7 +546,7 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
Preconditions.checkNotNull(interval);
final Collection<DataSegment> usedSegments;
- if (toolbox == null ||
!toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) {
+ if (toolbox == null) {
usedSegments = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegments(dataSource,
Collections.singletonList(interval)),
true
@@ -554,7 +554,10 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
} else {
try {
usedSegments = toolbox.getTaskActionClient()
- .submit(new
RetrieveSegmentsToReplaceAction(dataSource,
Collections.singletonList(interval)));
+ .submit(new RetrieveUsedSegmentsAction(
+ dataSource,
+ Collections.singletonList(interval)
+ ));
}
catch (IOException e) {
LOG.error(e, "Error retrieving the used segments for interval[%s].",
interval);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
index 35a28be6cbd..c339a103b2d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
@@ -44,7 +43,7 @@ public class ActionBasedUsedSegmentCheckerTest
final TaskActionClient taskActionClient =
EasyMock.createMock(TaskActionClient.class);
EasyMock.expect(
taskActionClient.submit(
- new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"),
null, Segments.ONLY_VISIBLE)
+ new RetrieveUsedSegmentsAction("bar",
ImmutableList.of(Intervals.of("2002/P1D")))
)
).andReturn(
ImmutableList.of(
@@ -68,9 +67,7 @@ public class ActionBasedUsedSegmentCheckerTest
taskActionClient.submit(
new RetrieveUsedSegmentsAction(
"foo",
- null,
- ImmutableList.of(Intervals.of("2000/P1D"),
Intervals.of("2001/P1D")),
- Segments.ONLY_VISIBLE
+ ImmutableList.of(Intervals.of("2000/P1D"),
Intervals.of("2001/P1D"))
)
)
).andReturn(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 0fbfb1733a8..2dee594aad6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
@@ -98,7 +97,7 @@ public class RetrieveSegmentsActionsTest
public void testRetrieveUsedSegmentsAction()
{
final RetrieveUsedSegmentsAction action =
- new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null,
Segments.ONLY_VISIBLE);
+ new RetrieveUsedSegmentsAction(task.getDataSource(),
ImmutableList.of(INTERVAL));
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task,
actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUsedSegments, resultSegments);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
index 0876092cac1..99675fd57bb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java
@@ -56,9 +56,7 @@ public class RetrieveUsedSegmentsActionSerdeTest
List<Interval> intervals = ImmutableList.of(Intervals.of("2014/2015"),
Intervals.of("2016/2017"));
RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction(
"dataSource",
- null,
- intervals,
- Segments.ONLY_VISIBLE
+ intervals
);
RetrieveUsedSegmentsAction actual =
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
index 8b488fff809..af920ebbeb7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
@@ -41,7 +41,6 @@ public class TaskConfigBuilder
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
- private Boolean enableConcurrentAppendAndReplace;
public TaskConfigBuilder setBaseDir(String baseDir)
{
@@ -133,18 +132,6 @@ public class TaskConfigBuilder
return this;
}
- public TaskConfigBuilder enableConcurrentAppendAndReplace()
- {
- this.enableConcurrentAppendAndReplace = true;
- return this;
- }
-
- public TaskConfigBuilder disableConcurrentAppendAndReplace()
- {
- this.enableConcurrentAppendAndReplace = false;
- return this;
- }
-
public TaskConfig build()
{
return new TaskConfig(
@@ -162,8 +149,7 @@ public class TaskConfigBuilder
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush,
- tmpStorageBytesPerTask,
- enableConcurrentAppendAndReplace
+ tmpStorageBytesPerTask
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 7f83a8f0233..3fda953a454 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
-import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -955,7 +954,7 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
try {
final TaskActionClient taskActionClient =
taskActionClientFactory.create(task);
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
- new RetrieveSegmentsToReplaceAction(
+ new RetrieveUsedSegmentsAction(
WIKI,
Collections.singletonList(interval)
)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index b0b347e25a8..002c70262cb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -616,7 +616,6 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.setTmpStorageBytesPerTask(-1L)
- .enableConcurrentAppendAndReplace()
.build();
return new TaskToolboxFactory(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]