This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 92906059d2 Remove segmentsToBeDropped from
SegmentTransactionInsertAction (#14883)
92906059d2 is described below
commit 92906059d2997a6902eac03b2a5198e5164043df
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 21 20:08:56 2023 +0530
Remove segmentsToBeDropped from SegmentTransactionInsertAction (#14883)
Motivation:
- There is no usage of the `SegmentTransactionInsertAction` which passes a
non-null non-empty value of `segmentsToBeDropped`.
- This is not really needed either as overshadowed segments are marked as
unused
by the Coordinator and need not be done in the same transaction as
committing segments.
- It will also help simplify the changes being made in #14407
Changes:
- Remove `segmentsToBeDropped` from the task action and all intermediate
methods
- Remove related tests which are not needed anymore
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 2 +-
.../actions/SegmentTransactionalInsertAction.java | 27 +---
.../task/AppenderatorDriverRealtimeIndexTask.java | 8 +-
.../druid/indexing/common/task/IndexTask.java | 5 +-
.../parallel/ParallelIndexSupervisorTask.java | 9 +-
.../indexing/seekablestream/SequenceMetadata.java | 9 +-
.../SegmentTransactionalInsertActionTest.java | 60 --------
.../AppenderatorDriverRealtimeIndexTaskTest.java | 7 +-
.../seekablestream/SequenceMetadataTest.java | 34 +----
.../TestIndexerMetadataStorageCoordinator.java | 1 -
.../indexing/overlord/DataSourceMetadata.java | 2 +-
.../IndexerMetadataStorageCoordinator.java | 2 -
.../IndexerSQLMetadataStorageCoordinator.java | 80 +---------
.../appenderator/BaseAppenderatorDriver.java | 2 -
.../appenderator/BatchAppenderatorDriver.java | 3 -
.../appenderator/StreamAppenderatorDriver.java | 1 -
.../TransactionalSegmentPublisher.java | 3 -
.../IndexerSQLMetadataStorageCoordinatorTest.java | 169 ---------------------
...edSegmentsSinksBatchAppenderatorDriverTest.java | 6 +-
...dClosedSegmentsBatchAppenderatorDriverTest.java | 6 +-
.../appenderator/StreamAppenderatorDriverTest.java | 4 +-
21 files changed, 27 insertions(+), 413 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 2373a59ec9..9aa94e9e47 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1306,7 +1306,7 @@ public class ControllerImpl implements Controller
} else {
performSegmentPublish(
context.taskActionClient(),
- SegmentTransactionalInsertAction.overwriteAction(null, null,
segmentsWithTombstones)
+ SegmentTransactionalInsertAction.overwriteAction(null,
segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index a0567dce04..cd11e0befd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -66,12 +66,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
- /**
- * Set of segments to be dropped (mark unused) when new segments, {@link
SegmentTransactionalInsertAction#segments},
- * are inserted into metadata storage.
- */
- @Nullable
- private final Set<DataSegment> segmentsToBeDropped;
@Nullable
private final DataSourceMetadata startMetadata;
@@ -82,11 +76,10 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
- @Nullable Set<DataSegment> segmentsToBeDropped,
Set<DataSegment> segmentsToPublish
)
{
- return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToBeDropped, segmentsToPublish, null, null, null);
+ return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null, null);
}
public static SegmentTransactionalInsertAction appendAction(
@@ -95,7 +88,7 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
@Nullable DataSourceMetadata endMetadata
)
{
- return new SegmentTransactionalInsertAction(null, null, segments,
startMetadata, endMetadata, null);
+ return new SegmentTransactionalInsertAction(null, segments, startMetadata,
endMetadata, null);
}
public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
@@ -104,13 +97,12 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
DataSourceMetadata endMetadata
)
{
- return new SegmentTransactionalInsertAction(null, null, null,
startMetadata, endMetadata, dataSource);
+ return new SegmentTransactionalInsertAction(null, null, startMetadata,
endMetadata, dataSource);
}
@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment>
segmentsToBeOverwritten,
- @JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment>
segmentsToBeDropped,
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata
startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@@ -118,7 +110,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
- this.segmentsToBeDropped = segmentsToBeDropped;
this.segments = segments == null ? ImmutableSet.of() :
ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
@@ -132,13 +123,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
return segmentsToBeOverwritten;
}
- @JsonProperty
- @Nullable
- public Set<DataSegment> getSegmentsToBeDropped()
- {
- return segmentsToBeDropped;
- }
-
@JsonProperty
public Set<DataSegment> getSegments()
{
@@ -202,9 +186,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
}
- if (segmentsToBeDropped != null) {
- allSegments.addAll(segmentsToBeDropped);
- }
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
allSegments);
@@ -224,7 +205,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
.onValidLocks(
() ->
toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
- segmentsToBeDropped,
startMetadata,
endMetadata
)
@@ -359,7 +339,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource='" + dataSource + '\'' +
- ", segmentsToBeDropped=" +
SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 45746aa636..0d12f2ba2e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -350,19 +350,13 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);
- final TransactionalSegmentPublisher publisher =
(mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments,
commitMetadata) -> {
+ final TransactionalSegmentPublisher publisher =
(mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null &&
!mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite
segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
- if (mustBeNullOrEmptyDropSegments != null &&
!mustBeNullOrEmptyDropSegments.isEmpty()) {
- throw new ISE(
- "Stream ingestion task unexpectedly attempted to drop segments:
%s",
-
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
- );
- }
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(
segments,
null,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 3a4b574d4e..4642b4391d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -912,9 +912,9 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
- final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish, commitMetadata) ->
+ final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
-
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish));
+
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish));
String effectiveId =
getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
@@ -996,7 +996,6 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(
inputSegments,
- null,
tombStones,
publisher,
annotateFunction
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index bfe69d6ee0..f5653c1c86 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1170,16 +1170,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
}
- final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish, commitMetadata) ->
+ final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
-
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToDrop, segmentsToPublish)
+
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
segmentsToPublish)
);
final boolean published =
newSegments.isEmpty()
- || publisher.publishSegments(oldSegments,
- Collections.emptySet(),
- newSegments, annotateFunction,
- null).isSuccess();
+ || publisher.publishSegments(oldSegments, newSegments,
annotateFunction, null).isSuccess();
if (published) {
LOG.info("Published [%d] segments", newSegments.size());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 6a786523d4..161a36de2f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -337,7 +337,6 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
@Override
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> mustBeNullOrEmptyOverwriteSegments,
- @Nullable Set<DataSegment> mustBeNullOrEmptyDropSegments,
Set<DataSegment> segmentsToPush,
@Nullable Object commitMetadata
) throws IOException
@@ -348,13 +347,7 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
- if (mustBeNullOrEmptyDropSegments != null &&
!mustBeNullOrEmptyDropSegments.isEmpty()) {
- throw new ISE(
- "Stream ingestion task unexpectedly attempted to drop segments:
%s",
-
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
- );
- }
- final Map commitMetaMap = (Map)
Preconditions.checkNotNull(commitMetadata, "commitMetadata");
+ final Map<?, ?> commitMetaMap = (Map<?, ?>)
Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final SeekableStreamEndSequenceNumbers<PartitionIdType,
SequenceOffsetType> finalPartitions =
runner.deserializePartitionsFromMetadata(
toolbox.getJsonMapper(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 0243a0d20d..b8bdbfb2ad 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -134,39 +134,6 @@ public class SegmentTransactionalInsertActionTest
);
}
- @Test
- public void testTransactionalDropSegments() throws Exception
- {
- final Task task = NoopTask.create();
- actionTestKit.getTaskLockbox().add(task);
- acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
-
- SegmentPublishResult result1 =
SegmentTransactionalInsertAction.overwriteAction(
- null,
- null,
- ImmutableSet.of(SEGMENT1)
- ).perform(
- task,
- actionTestKit.getTaskActionToolbox()
- );
- Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)),
result1);
-
- SegmentPublishResult result2 =
SegmentTransactionalInsertAction.overwriteAction(
- null,
- ImmutableSet.of(SEGMENT1),
- ImmutableSet.of(SEGMENT2)
- ).perform(
- task,
- actionTestKit.getTaskActionToolbox()
- );
- Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)),
result2);
-
- Assertions.assertThat(
- actionTestKit.getMetadataStorageCoordinator()
- .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL,
Segments.ONLY_VISIBLE)
- ).containsExactlyInAnyOrder(SEGMENT2);
- }
-
@Test
public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
{
@@ -193,38 +160,11 @@ public class SegmentTransactionalInsertActionTest
);
}
- @Test
- public void testFailTransactionalDropSegment() throws Exception
- {
- final Task task = NoopTask.create();
- actionTestKit.getTaskLockbox().add(task);
- acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
-
- SegmentPublishResult result =
SegmentTransactionalInsertAction.overwriteAction(
- null,
- // SEGMENT1 does not exist, hence will fail to drop
- ImmutableSet.of(SEGMENT1),
- ImmutableSet.of(SEGMENT2)
- ).perform(
- task,
- actionTestKit.getTaskActionToolbox()
- );
-
- Assert.assertEquals(
- SegmentPublishResult.fail(
- "org.apache.druid.metadata.RetryTransactionException: " +
- "Failed to drop some segments. Only 0 could be dropped out of 1.
Trying again"
- ),
- result
- );
- }
-
@Test
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.overwriteAction(
- null,
null,
ImmutableSet.of(SEGMENT3)
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 7ea82b8f57..049bc11e0f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1525,16 +1525,15 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
- Set<DataSegment> segmentsToDrop,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException
{
- SegmentPublishResult result =
super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata,
endMetadata);
+ SegmentPublishResult result =
super.announceHistoricalSegments(segments, startMetadata, endMetadata);
- Assert.assertFalse(
+ Assert.assertNotNull(
"Segment latch not initialized, did you forget to call
expectPublishSegments?",
- segmentLatch == null
+ segmentLatch
);
publishedSegments.addAll(result.getSegments());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
index 9f03d5f2e8..aae07194bb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
@@ -85,37 +85,7 @@ public class SequenceMetadataTest
"Stream ingestion task unexpectedly attempted to overwrite segments: "
+ SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
);
-
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment,
null, ImmutableSet.of(), null);
- }
-
- @Test
- public void
testPublishAnnotatedSegmentsThrowExceptionIfDropSegmentsNotNullAndNotEmpty()
throws Exception
- {
- DataSegment dataSegment = DataSegment.builder()
- .dataSource("foo")
- .interval(Intervals.of("2001/P1D"))
- .shardSpec(new LinearShardSpec(1))
- .version("b")
- .size(0)
- .build();
-
- Set<DataSegment> notNullNotEmptySegment = ImmutableSet.of(dataSegment);
- SequenceMetadata<Integer, Integer> sequenceMetadata = new
SequenceMetadata<>(
- 1,
- "test",
- ImmutableMap.of(),
- ImmutableMap.of(),
- true,
- ImmutableSet.of()
- );
- TransactionalSegmentPublisher transactionalSegmentPublisher =
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner,
mockTaskToolbox, true);
-
- expectedException.expect(ISE.class);
- expectedException.expectMessage(
- "Stream ingestion task unexpectedly attempted to drop segments: " +
SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
- );
-
- transactionalSegmentPublisher.publishAnnotatedSegments(null,
notNullNotEmptySegment, ImmutableSet.of(), null);
+
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment,
ImmutableSet.of(), null);
}
@Test
@@ -143,6 +113,6 @@ public class SequenceMetadataTest
);
TransactionalSegmentPublisher transactionalSegmentPublisher =
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner,
mockTaskToolbox, false);
- transactionalSegmentPublisher.publishAnnotatedSegments(null, null,
notNullNotEmptySegment, ImmutableMap.of());
+ transactionalSegmentPublisher.publishAnnotatedSegments(null,
notNullNotEmptySegment, ImmutableMap.of());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 0606889b7f..41a688fc9f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -159,7 +159,6 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
- Set<DataSegment> segmentsToDrop,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
)
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java
b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java
index 6cad3c1264..23579a0c8a 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java
@@ -26,7 +26,7 @@ import java.util.Set;
/**
* Commit metadata for a dataSource. Used by
- * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set,
Set, DataSourceMetadata, DataSourceMetadata)}
+ * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set,
DataSourceMetadata, DataSourceMetadata)}
* to provide metadata transactions for segment inserts.
*
* Two metadata instances can be added together, and any conflicts are
resolved in favor of the right-hand side.
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 589b60f027..a954c1f2d6 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -257,7 +257,6 @@ public interface IndexerMetadataStorageCoordinator
* {@param segments} and dropping {@param segmentsToDrop}
*
* @param segments set of segments to add, must all be from the same
dataSource
- * @param segmentsToDrop set of segments to drop, must all be from the same
dataSource
* @param startMetadata dataSource metadata pre-insert must match this
startMetadata according to
* {@link
DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
* not involve a metadata transaction
@@ -274,7 +273,6 @@ public interface IndexerMetadataStorageCoordinator
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
- Set<DataSegment> segmentsToDrop,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
) throws IOException;
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 3773aa466d..25a96a3076 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -303,7 +303,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Override
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment>
segments) throws IOException
{
- final SegmentPublishResult result = announceHistoricalSegments(segments,
null, null, null);
+ final SegmentPublishResult result = announceHistoricalSegments(segments,
null, null);
// Metadata transaction cannot fail because we are not trying to do one.
if (!result.isSuccess()) {
@@ -316,7 +316,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Override
public SegmentPublishResult announceHistoricalSegments(
final Set<DataSegment> segments,
- final Set<DataSegment> segmentsToDrop,
@Nullable final DataSourceMetadata startMetadata,
@Nullable final DataSourceMetadata endMetadata
) throws IOException
@@ -382,27 +381,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- if (segmentsToDrop != null && !segmentsToDrop.isEmpty()) {
- final DataStoreMetadataUpdateResult result =
dropSegmentsWithHandle(
- handle,
- segmentsToDrop,
- dataSource
- );
- if (result.isFailed()) {
- // Metadata store was definitely not updated.
- transactionStatus.setRollbackOnly();
- definitelyNotUpdated.set(true);
-
- if (result.canRetry()) {
- throw new RetryTransactionException(result.getErrorMsg());
- } else {
- throw new RuntimeException(result.getErrorMsg());
- }
- }
- }
-
final Set<DataSegment> inserted =
announceHistoricalSegmentBatch(handle, segments, usedSegments);
-
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
},
@@ -1533,7 +1512,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
*
* @return SUCCESS if dataSource metadata was updated from matching
startMetadata to matching endMetadata, FAILURE or
* TRY_AGAIN if it definitely was not updated. This guarantee is meant to
help
- * {@link #announceHistoricalSegments(Set, Set, DataSourceMetadata,
DataSourceMetadata)}
+ * {@link #announceHistoricalSegments(Set, DataSourceMetadata,
DataSourceMetadata)}
* achieve its own guarantee.
*
* @throws RuntimeException if state is unknown after this call
@@ -1653,61 +1632,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return retVal;
}
- /**
- * Mark segments as unsed in a transaction. This method is idempotent in
that if
- * the segments was already marked unused, it will return true.
- *
- * @param handle database handle
- * @param segmentsToDrop segments to mark as unused
- * @param dataSource druid dataSource
- *
- * @return SUCCESS if segment was marked unused, FAILURE or
- * TRY_AGAIN if it definitely was not updated. This guarantee is meant to
help
- * {@link #announceHistoricalSegments(Set, Set, DataSourceMetadata,
DataSourceMetadata)}
- * achieve its own guarantee.
- *
- * @throws RuntimeException if state is unknown after this call
- */
- protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
- final Handle handle,
- final Collection<DataSegment> segmentsToDrop,
- final String dataSource
- )
- {
- Preconditions.checkNotNull(dataSource, "dataSource");
- Preconditions.checkNotNull(segmentsToDrop, "segmentsToDrop");
-
- if (segmentsToDrop.isEmpty()) {
- return DataStoreMetadataUpdateResult.SUCCESS;
- }
-
- if (segmentsToDrop.stream().anyMatch(segment ->
!dataSource.equals(segment.getDataSource()))) {
- // All segments to drop must belong to the same datasource
- return new DataStoreMetadataUpdateResult(
- true,
- false,
- "Not dropping segments, as not all segments belong to the
datasource[%s].",
- dataSource);
- }
-
- final int numChangedSegments =
- SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables,
jsonMapper).markSegments(
-
segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()),
- false
- );
-
- if (numChangedSegments != segmentsToDrop.size()) {
- return new DataStoreMetadataUpdateResult(
- true,
- true,
- "Failed to drop some segments. Only %d could be dropped out of %d.
Trying again",
- numChangedSegments,
- segmentsToDrop.size()
- );
- }
- return DataStoreMetadataUpdateResult.SUCCESS;
- }
-
@Override
public boolean deleteDataSourceMetadata(final String dataSource)
{
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 65a8a676d3..c050d9c5e4 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -560,7 +560,6 @@ public abstract class BaseAppenderatorDriver implements
Closeable
*/
ListenableFuture<SegmentsAndCommitMetadata> publishInBackground(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
- @Nullable Set<DataSegment> segmentsToBeDropped,
@Nullable Set<DataSegment> tombstones,
SegmentsAndCommitMetadata segmentsAndCommitMetadata,
TransactionalSegmentPublisher publisher,
@@ -601,7 +600,6 @@ public abstract class BaseAppenderatorDriver implements
Closeable
final ImmutableSet<DataSegment> ourSegments =
ImmutableSet.copyOf(pushedAndTombstones);
final SegmentPublishResult publishResult =
publisher.publishSegments(
segmentsToBeOverwritten,
- segmentsToBeDropped,
ourSegments,
outputSegmentsAnnotateFunction,
callerMetadata
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
index 7a99f200be..cf2efef2f8 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
@@ -192,14 +192,12 @@ public class BatchAppenderatorDriver extends
BaseAppenderatorDriver
* Publish all segments.
*
* @param segmentsToBeOverwritten segments which can be overwritten by new
segments published by the given publisher
- * @param segmentsToBeDropped segments which will be dropped and marked
unused
* @param publisher segment publisher
*
* @return a {@link ListenableFuture} for the publish task
*/
public ListenableFuture<SegmentsAndCommitMetadata> publishAll(
@Nullable final Set<DataSegment> segmentsToBeOverwritten,
- @Nullable final Set<DataSegment> segmentsToBeDropped,
@Nullable final Set<DataSegment> tombstones,
final TransactionalSegmentPublisher publisher,
final Function<Set<DataSegment>, Set<DataSegment>>
outputSegmentsAnnotateFunction
@@ -212,7 +210,6 @@ public class BatchAppenderatorDriver extends
BaseAppenderatorDriver
return publishInBackground(
segmentsToBeOverwritten,
- segmentsToBeDropped,
tombstones == null ? Collections.emptySet() : tombstones,
new SegmentsAndCommitMetadata(
snapshot
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 8c7ec417f8..f822663196 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -282,7 +282,6 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
// version of a segment with the same identifier containing different
data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>)
sam -> publishInBackground(
- null,
null,
null,
sam,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index a71e4cdb6d..2ffb4dd572 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -41,14 +41,12 @@ public interface TransactionalSegmentPublisher
*/
SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
- @Nullable Set<DataSegment> segmentsToDrop,
Set<DataSegment> segmentsToPublish,
@Nullable Object commitMetadata
) throws IOException;
default SegmentPublishResult publishSegments(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
- @Nullable Set<DataSegment> segmentsToDrop,
Set<DataSegment> segmentsToPublish,
Function<Set<DataSegment>, Set<DataSegment>>
outputSegmentsAnnotateFunction,
@Nullable Object commitMetadata
@@ -58,7 +56,6 @@ public interface TransactionalSegmentPublisher
.andThen(SegmentPublisherHelper::annotateShardSpec);
return publishAnnotatedSegments(
segmentsToBeOverwritten,
- segmentsToDrop,
annotateFunction.apply(segmentsToPublish),
commitMetadata
);
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9fadb8b106..68654c8855 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -352,18 +352,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
return super.updateDataSourceMetadataWithHandle(handle, dataSource,
startMetadata, endMetadata);
}
- @Override
- protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
- final Handle handle,
- final Collection<DataSegment> segmentsToDrop,
- final String dataSource
- )
- {
- // Count number of times this method is called.
- segmentTableDropUpdateCounter.getAndIncrement();
- return super.dropSegmentsWithHandle(handle, segmentsToDrop,
dataSource);
- }
-
@Override
public int getSqlMetadataMaxRetry()
{
@@ -560,7 +548,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Insert first segment.
final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
@@ -579,7 +566,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Insert second segment.
final SegmentPublishResult result2 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
- ImmutableSet.of(),
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -636,7 +622,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Insert first segment.
final SegmentPublishResult result1 =
failOnceCoordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
@@ -658,7 +643,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Insert second segment.
final SegmentPublishResult result2 =
failOnceCoordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
- ImmutableSet.of(),
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -689,7 +673,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -702,116 +685,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(1, metadataUpdateCounter.get());
}
- @Test
- public void testTransactionalAnnounceFailSegmentDropFailWithoutRetry()
throws IOException
- {
- insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2));
-
- Assert.assertEquals(
- ImmutableList.of(existingSegment1.getId().toString(),
existingSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
-
- DataSegment dataSegmentBar = DataSegment.builder()
- .dataSource("bar")
- .interval(Intervals.of("2001/P1D"))
- .shardSpec(new LinearShardSpec(1))
- .version("b")
- .size(0)
- .build();
- Set<DataSegment> dropSegments = ImmutableSet.of(existingSegment1,
existingSegment2, dataSegmentBar);
-
- final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
- SEGMENTS,
- dropSegments,
- null,
- null
- );
- Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException:
Not dropping segments, " +
- "as not all segments belong to the datasource[fooDataSource]."),
result1);
-
- // Should only be tried once. Since dropSegmentsWithHandle will return
FAILURE (not TRY_AGAIN) as set of
- // segments to drop contains more than one datasource.
- Assert.assertEquals(1, segmentTableDropUpdateCounter.get());
-
- Assert.assertEquals(
- ImmutableList.of(existingSegment1.getId().toString(),
existingSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
- }
-
- @Test
- public void testTransactionalAnnounceSucceedWithSegmentDrop() throws
IOException
- {
- insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2));
-
- Assert.assertEquals(
- ImmutableList.of(existingSegment1.getId().toString(),
existingSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
-
- final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
- SEGMENTS,
- ImmutableSet.of(existingSegment1, existingSegment2),
- null,
- null
- );
-
- Assert.assertEquals(SegmentPublishResult.ok(SEGMENTS), result1);
-
- for (DataSegment segment : SEGMENTS) {
- Assert.assertArrayEquals(
- mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8),
- derbyConnector.lookup(
-
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
- "id",
- "payload",
- segment.getId().toString()
- )
- );
- }
-
- Assert.assertEquals(
- ImmutableList.of(defaultSegment.getId().toString(),
defaultSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
- }
-
- @Test
- public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws
IOException
- {
- insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2));
-
- Assert.assertEquals(
- ImmutableList.of(existingSegment1.getId().toString(),
existingSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
-
- Set<DataSegment> dropSegments = ImmutableSet.of(existingSegment1,
defaultSegment4);
- final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
- SEGMENTS,
- dropSegments,
- null,
- null
- );
- Assert.assertEquals(SegmentPublishResult.fail(
- "org.apache.druid.metadata.RetryTransactionException: Failed to drop
some segments. " +
- "Only 1 could be dropped out of 2. Trying again"), result1);
-
- Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST,
segmentTableDropUpdateCounter.get());
-
- Assert.assertEquals(
- ImmutableList.of(existingSegment1.getId().toString(),
existingSegment2.getId().toString()),
- retrieveUsedSegmentIds()
- );
- }
-
@Test
public void testTransactionalAnnounceFailDbNotNullWantNull() throws
IOException
{
final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -819,7 +697,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final SegmentPublishResult result2 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -837,7 +714,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
final SegmentPublishResult result1 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -845,7 +721,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final SegmentPublishResult result2 =
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
- ImmutableSet.of(),
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
@@ -1393,7 +1268,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
@@ -2304,52 +2178,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertNull(id);
}
- @Test
- public void testDropSegmentsWithHandleForSegmentThatExist()
- {
- try (Handle handle = derbyConnector.getDBI().open()) {
- Assert.assertTrue(insertUsedSegments(ImmutableSet.of(defaultSegment)));
- List<String> usedSegments = retrieveUsedSegmentIds();
- Assert.assertEquals(1, usedSegments.size());
- Assert.assertEquals(defaultSegment.getId().toString(),
usedSegments.get(0));
-
- // Try drop segment
- IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult
result = coordinator.dropSegmentsWithHandle(
- handle,
- ImmutableSet.of(defaultSegment),
- defaultSegment.getDataSource()
- );
-
-
Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.SUCCESS,
result);
- usedSegments = retrieveUsedSegmentIds();
- Assert.assertEquals(0, usedSegments.size());
- }
- }
-
- @Test
- public void testDropSegmentsWithHandleForSegmentThatDoesNotExist()
- {
- try (Handle handle = derbyConnector.getDBI().open()) {
- // Try drop segment
- IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult
result = coordinator.dropSegmentsWithHandle(
- handle,
- ImmutableSet.of(defaultSegment),
- defaultSegment.getDataSource()
- );
- Assert.assertEquals(new
IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult(
- true,
- true,
- "Failed to drop some segments. Only 0 could be dropped out of 1.
Trying again"),
- result);
- }
- }
-
@Test
public void
testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted()
throws Exception
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
@@ -2378,7 +2211,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
@@ -2404,7 +2236,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
- ImmutableSet.of(),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java
index a2f24ebd7e..0dcb987c59 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java
@@ -127,7 +127,7 @@ public class ClosedSegmentsSinksBatchAppenderatorDriverTest
extends EasyMockSupp
checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED);
final SegmentsAndCommitMetadata published =
- driver.publishAll(null, null, Collections.emptySet(),
makeOkPublisher(), Function.identity())
+ driver.publishAll(null, Collections.emptySet(), makeOkPublisher(),
Function.identity())
.get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
@@ -162,7 +162,7 @@ public class ClosedSegmentsSinksBatchAppenderatorDriverTest
extends EasyMockSupp
}
final SegmentsAndCommitMetadata published =
- driver.publishAll(null, null, Collections.emptySet(),
makeOkPublisher(), Function.identity())
+ driver.publishAll(null, Collections.emptySet(), makeOkPublisher(),
Function.identity())
.get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
@@ -204,7 +204,7 @@ public class ClosedSegmentsSinksBatchAppenderatorDriverTest
extends EasyMockSupp
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish,
commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
+ return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
SegmentPublishResult.ok(ImmutableSet.of());
}
static class TestSegmentAllocator implements SegmentAllocator
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java
index 1fb6c79096..9657521a52 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java
@@ -125,7 +125,7 @@ public class
OpenAndClosedSegmentsBatchAppenderatorDriverTest extends EasyMockSu
checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED);
final SegmentsAndCommitMetadata published =
- driver.publishAll(null, null, null, makeOkPublisher(),
Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
+ driver.publishAll(null, null, makeOkPublisher(),
Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -159,7 +159,7 @@ public class
OpenAndClosedSegmentsBatchAppenderatorDriverTest extends EasyMockSu
}
final SegmentsAndCommitMetadata published =
- driver.publishAll(null, null, null, makeOkPublisher(),
Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
+ driver.publishAll(null, null, makeOkPublisher(),
Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -200,6 +200,6 @@ public class
OpenAndClosedSegmentsBatchAppenderatorDriverTest extends EasyMockSu
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish,
commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
+ return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
SegmentPublishResult.ok(ImmutableSet.of());
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 476ecded7e..8f2b77c4fc 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -375,13 +375,13 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish,
commitMetadata) ->
+ return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
SegmentPublishResult.ok(Collections.emptySet());
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean
failWithException)
{
- return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish,
commitMetadata) -> {
+ return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> {
final RuntimeException exception = new RuntimeException("test");
if (failWithException) {
throw exception;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]