This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 00ce18a Additional Kinesis resharding fixes (#8870)
00ce18a is described below
commit 00ce18a0ea1b984a07479489d43d9901575d3d28
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Nov 28 12:59:01 2019 -0800
Additional Kinesis resharding fixes (#8870)
* Additional Kinesis resharding fixes
* Address PR comments
* Remove unused method
* Adjust SegmentTransactionalInsertAction null handling
* Check for unchanged metadata on empty publish
* Add logs for empty publish
* Fix javadoc
* Clear offset when invalid endOffsets are seen
* Fix LGTM alert
* Fix build
* Add resharding note to Kinesis docs
* Checkstyle
* Spelling
* Address PR comments
* Checkstyle
---
.../extensions-core/kinesis-ingestion.md | 11 +
.../indexing/kafka/supervisor/KafkaSupervisor.java | 5 -
.../kafka/supervisor/KafkaSupervisorTest.java | 7 +-
.../kinesis/supervisor/KinesisSupervisor.java | 73 +++--
.../kinesis/supervisor/KinesisSupervisorTest.java | 70 +++--
.../actions/SegmentTransactionalInsertAction.java | 50 +++-
.../apache/druid/indexing/overlord/TaskQueue.java | 27 ++
.../indexing/seekablestream/SequenceMetadata.java | 91 +++++-
.../supervisor/SeekableStreamSupervisor.java | 317 ++++++++++++++-------
.../TestIndexerMetadataStorageCoordinator.java | 10 +
.../IndexerMetadataStorageCoordinator.java | 27 ++
.../IndexerSQLMetadataStorageCoordinator.java | 68 +++++
.../appenderator/BaseAppenderatorDriver.java | 20 +-
.../TransactionalSegmentPublisher.java | 9 +
website/.spelling | 2 +
15 files changed, 609 insertions(+), 178 deletions(-)
diff --git a/docs/development/extensions-core/kinesis-ingestion.md
b/docs/development/extensions-core/kinesis-ingestion.md
index aec4699..9ede4f3 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -440,3 +440,14 @@ compatible with Apache projects.
To enable this feature, add the `amazon-kinesis-client` (tested on version
`1.9.2`) jar file
([link](https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.9.2))
under `dist/druid/extensions/druid-kinesis-indexing-service/`.
Then when submitting a supervisor-spec, set `deaggregate` to true.
+
+## Resharding
+
+When changing the shard count for a Kinesis stream, there will be a window of
time around the resharding operation with early shutdown of Kinesis ingestion
tasks.
+This occurs because the supervisor will update the shard -> task group
mappings as shards are closed and fully read, to ensure that tasks are not
running
+with an assignment of closed shards that have been fully read and to ensure a
balanced distribution of active shards across tasks.
+
+This window with early task shutdowns will conclude when:
+- All closed shards have been fully read and the Kinesis ingestion tasks have
published the data from those shards, committing the "closed" state to metadata
storage
+- Any remaining tasks that had inactive shards in the assignment have been
shutdown (these tasks would have been created before the closed shards were
completely drained)
+
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 98ba169..99c8105 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -154,14 +154,9 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long>
);
}
-
@Override
protected int getTaskGroupIdForPartition(Integer partitionId)
{
- // record partitionIds so that supervisor knows when a partition is
discovered.
- if (!partitionIds.contains(partitionId)) {
- partitionIds.add(partitionId);
- }
return partitionId % spec.getIoConfig().getTaskCount();
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 47b2d11..a50356f 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1274,12 +1274,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.times(2);
EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
- .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 1, 20L,
2, 30L)))
- .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 1, 15L,
2, 35L)));
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 2,
30L)))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 2,
35L)));
EasyMock.expect(
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
- EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
+ EasyMock.eq(ImmutableMap.of(0, 10L, 2, 35L)),
EasyMock.eq(true)
)
).andReturn(Futures.immediateFuture(true)).times(2);
@@ -1312,7 +1312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(10L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0));
- Assert.assertEquals(20L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1));
Assert.assertEquals(35L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2));
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index f7454a4..f150067 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
@@ -66,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -81,8 +78,6 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
- private static final HashFunction HASH_FUNCTION = Hashing.sha1();
-
private static final EmittingLogger log = new
EmittingLogger(KinesisSupervisor.class);
public static final TypeReference<TreeMap<Integer, Map<String, String>>>
CHECKPOINTS_TYPE_REF =
@@ -232,39 +227,32 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
@Override
protected int getTaskGroupIdForPartition(String partitionId)
{
- if (!partitionIds.contains(partitionId)) {
- partitionIds.add(partitionId);
- }
-
return getTaskGroupIdForPartitionWithProvidedList(partitionId,
partitionIds);
}
private int getTaskGroupIdForPartitionWithProvidedList(String partitionId,
List<String> availablePartitions)
{
+ int index = availablePartitions.indexOf(partitionId);
+ if (index < 0) {
+ return index;
+ }
return availablePartitions.indexOf(partitionId) %
spec.getIoConfig().getTaskCount();
}
@Override
- protected Map<Integer, ConcurrentHashMap<String, String>>
recomputePartitionGroupsForExpiration(
- Set<String> availablePartitions
- )
+ protected Map<Integer, Set<String>>
recomputePartitionGroupsForExpiration(Set<String> availablePartitions)
{
List<String> availablePartitionsList = new
ArrayList<>(availablePartitions);
- Map<Integer, ConcurrentHashMap<String, String>> newPartitionGroups = new
HashMap<>();
-
- for (ConcurrentHashMap<String, String> oldGroup :
partitionGroups.values()) {
- for (Map.Entry<String, String> partitionOffsetMapping :
oldGroup.entrySet()) {
- String partitionId = partitionOffsetMapping.getKey();
- if (availablePartitions.contains(partitionId)) {
- int newTaskGroupId =
getTaskGroupIdForPartitionWithProvidedList(partitionId,
availablePartitionsList);
- ConcurrentHashMap<String, String> partitionMap =
newPartitionGroups.computeIfAbsent(
- newTaskGroupId,
- k -> new ConcurrentHashMap<>()
- );
- partitionMap.put(partitionId, partitionOffsetMapping.getValue());
- }
- }
+ Map<Integer, Set<String>> newPartitionGroups = new HashMap<>();
+
+ for (String availablePartition : availablePartitions) {
+ int newTaskGroupId =
getTaskGroupIdForPartitionWithProvidedList(availablePartition,
availablePartitionsList);
+ Set<String> newGroup = newPartitionGroups.computeIfAbsent(
+ newTaskGroupId,
+ k -> new HashSet<>()
+ );
+ newGroup.add(availablePartition);
}
return newPartitionGroups;
@@ -398,6 +386,32 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
{
log.info("Marking expired shards in metadata: " + expiredPartitionIds);
+ return createDataSourceMetadataWithClosedOrExpiredPartitions(
+ currentMetadata,
+ expiredPartitionIds,
+ KinesisSequenceNumber.EXPIRED_MARKER
+ );
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadataWithClosedPartitions(
+ SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> closedPartitionIds
+ )
+ {
+ log.info("Marking closed shards in metadata: " + closedPartitionIds);
+ return createDataSourceMetadataWithClosedOrExpiredPartitions(
+ currentMetadata,
+ closedPartitionIds,
+ KinesisSequenceNumber.END_OF_SHARD_MARKER
+ );
+ }
+
+ private SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadataWithClosedOrExpiredPartitions(
+ SeekableStreamDataSourceMetadata<String, String> currentMetadata,
+ Set<String> terminatedPartitionIds,
+ String terminationMarker
+ )
+ {
final KinesisDataSourceMetadata dataSourceMetadata =
(KinesisDataSourceMetadata) currentMetadata;
SeekableStreamSequenceNumbers<String, String> old =
dataSourceMetadata.getSeekableStreamSequenceNumbers();
@@ -405,10 +419,10 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
Map<String, String> oldPartitionSequenceNumberMap =
old.getPartitionSequenceNumberMap();
Map<String, String> newPartitionSequenceNumberMap = new HashMap<>();
for (Map.Entry<String, String> entry :
oldPartitionSequenceNumberMap.entrySet()) {
- if (!expiredPartitionIds.contains(entry.getKey())) {
+ if (!terminatedPartitionIds.contains(entry.getKey())) {
newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue());
} else {
- newPartitionSequenceNumberMap.put(entry.getKey(),
KinesisSequenceNumber.EXPIRED_MARKER);
+ newPartitionSequenceNumberMap.put(entry.getKey(), terminationMarker);
}
}
@@ -420,7 +434,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
newExclusiveStartPartitions = new HashSet<>();
oldExclusiveStartPartitions =
((SeekableStreamStartSequenceNumbers<String, String>)
old).getExclusivePartitions();
for (String partitionId : oldExclusiveStartPartitions) {
- if (!expiredPartitionIds.contains(partitionId)) {
+ if (!terminatedPartitionIds.contains(partitionId)) {
newExclusiveStartPartitions.add(partitionId);
}
}
@@ -443,4 +457,5 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
return new KinesisDataSourceMetadata(newSequences);
}
+
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 292e9dc..5ce8f36 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -104,7 +104,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
public class KinesisSupervisorTest extends EasyMockSupport
@@ -1236,24 +1238,18 @@ public class KinesisSupervisorTest extends
EasyMockSupport
EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture(ImmutableMap.of(
SHARD_ID1,
- "1",
- SHARD_ID0,
- "0"
+ "1"
)))
.andReturn(Futures.immediateFuture(ImmutableMap.of(
SHARD_ID1,
- "3",
- SHARD_ID0,
- "1"
+ "3"
)));
EasyMock.expect(
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(
SHARD_ID1,
- "3",
- SHARD_ID0,
- "1"
+ "3"
)),
EasyMock.eq(true)
)
@@ -1296,13 +1292,9 @@ public class KinesisSupervisorTest extends
EasyMockSupport
"3",
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
);
- Assert.assertEquals(
- "1",
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
- );
// start sequenceNumbers should be exclusive for the second batch of
tasks
Assert.assertEquals(
- ImmutableSet.of(SHARD_ID0, SHARD_ID1),
+ ImmutableSet.of(SHARD_ID1),
((KinesisIndexTask)
task).getIOConfig().getStartSequenceNumbers().getExclusivePartitions()
);
}
@@ -3354,8 +3346,11 @@ public class KinesisSupervisorTest extends
EasyMockSupport
public void testGetCurrentTotalStats()
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor.setPartitionIdsForTests(
+ ImmutableList.of(SHARD_ID0, SHARD_ID1)
+ );
supervisor.addTaskGroupToActivelyReadingTaskGroup(
- supervisor.getTaskGroupIdForPartition("0"),
+ supervisor.getTaskGroupIdForPartition(SHARD_ID0),
ImmutableMap.of("0", "0"),
Optional.absent(),
Optional.absent(),
@@ -3364,7 +3359,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
- supervisor.getTaskGroupIdForPartition("1"),
+ supervisor.getTaskGroupIdForPartition(SHARD_ID1),
ImmutableMap.of("0", "0"),
Optional.absent(),
Optional.absent(),
@@ -3906,12 +3901,11 @@ public class KinesisSupervisorTest extends
EasyMockSupport
.anyTimes();
TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
checkpointsGroup0.put(0, ImmutableMap.of(
- SHARD_ID2, "0",
- SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER
+ SHARD_ID1, "0"
));
TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
checkpointsGroup1.put(1, ImmutableMap.of(
- SHARD_ID1, "0"
+ SHARD_ID2, "0"
));
// there would be 2 tasks, 1 for each task group
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
EasyMock.anyBoolean()))
@@ -3940,7 +3934,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(
STREAM,
ImmutableMap.of(
- SHARD_ID2, "0"
+ SHARD_ID1, "0"
),
ImmutableSet.of()
);
@@ -3949,7 +3943,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(
- SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
)
);
@@ -3957,7 +3951,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(
STREAM,
ImmutableMap.of(
- SHARD_ID1, "0"
+ SHARD_ID2, "0"
),
ImmutableSet.of()
);
@@ -3966,7 +3960,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(
- SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
)
);
@@ -4163,11 +4157,21 @@ public class KinesisSupervisorTest extends
EasyMockSupport
Assert.assertEquals(group1ExpectedStartSequenceNumbers,
group1Config.getStartSequenceNumbers());
Assert.assertEquals(group1ExpectedEndSequenceNumbers,
group1Config.getEndSequenceNumbers());
- Map<Integer, Map<String, String>> expectedPartitionGroups =
ImmutableMap.of(
- 0, ImmutableMap.of(SHARD_ID1, "-1"),
- 1, ImmutableMap.of(SHARD_ID2, "-1")
+ Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
+ 0, ImmutableSet.of(SHARD_ID1),
+ 1, ImmutableSet.of(SHARD_ID2)
);
Assert.assertEquals(expectedPartitionGroups,
supervisor.getPartitionGroups());
+
+ ConcurrentHashMap<String, String> expectedPartitionOffsets = new
ConcurrentHashMap<>(
+ ImmutableMap.of(
+ SHARD_ID2, "-1",
+ SHARD_ID1, "-1",
+ SHARD_ID0, "-1"
+ )
+ );
+ Assert.assertEquals(expectedPartitionOffsets,
supervisor.getPartitionOffsets());
+
}
@Test
@@ -4557,11 +4561,19 @@ public class KinesisSupervisorTest extends
EasyMockSupport
Assert.assertEquals(group0ExpectedStartSequenceNumbers,
group0Config.getStartSequenceNumbers());
Assert.assertEquals(group0ExpectedEndSequenceNumbers,
group0Config.getEndSequenceNumbers());
- Map<Integer, Map<String, String>> expectedPartitionGroups =
ImmutableMap.of(
- 0, ImmutableMap.of(SHARD_ID2, "-1"),
- 1, ImmutableMap.of()
+ Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
+ 0, ImmutableSet.of(SHARD_ID2),
+ 1, ImmutableSet.of()
+ );
+ ConcurrentHashMap<String, String> expectedPartitionOffsets = new
ConcurrentHashMap<>(
+ ImmutableMap.of(
+ SHARD_ID2, "-1",
+ SHARD_ID1, "-1",
+ SHARD_ID0, "-1"
+ )
);
Assert.assertEquals(expectedPartitionGroups,
supervisor.getPartitionGroups());
+ Assert.assertEquals(expectedPartitionOffsets,
supervisor.getPartitionOffsets());
}
private TestableKinesisSupervisor getTestableSupervisor(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index e1428c6..02cbb92 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -64,13 +64,15 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;
+ @Nullable
+ private final String dataSource;
public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish
)
{
- return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null);
+ return new SegmentTransactionalInsertAction(segmentsToBeOverwritten,
segmentsToPublish, null, null, null);
}
public static SegmentTransactionalInsertAction appendAction(
@@ -79,21 +81,32 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
@Nullable DataSourceMetadata endMetadata
)
{
- return new SegmentTransactionalInsertAction(null, segments, startMetadata,
endMetadata);
+ return new SegmentTransactionalInsertAction(null, segments, startMetadata,
endMetadata, null);
+ }
+
+ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
+ String dataSource,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ return new SegmentTransactionalInsertAction(null, null, startMetadata,
endMetadata, dataSource);
}
@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment>
segmentsToBeOverwritten,
- @JsonProperty("segments") Set<DataSegment> segments,
+ @JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata
startMetadata,
- @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
+ @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
+ @JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
- this.segments = ImmutableSet.copyOf(segments);
+ this.segments = segments == null ? ImmutableSet.of() :
ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
+ this.dataSource = dataSource;
}
@JsonProperty
@@ -123,6 +136,13 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
return endMetadata;
}
+ @JsonProperty
+ @Nullable
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
@@ -137,6 +157,24 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
+ final SegmentPublishResult retVal;
+
+ if (segments.isEmpty()) {
+ // A stream ingestion task didn't ingest any rows and created no
segments (e.g., all records were unparseable),
+ // but still needs to update metadata with the progress that the task
made.
+ try {
+ retVal =
toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly(
+ dataSource,
+ startMetadata,
+ endMetadata
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return retVal;
+ }
+
final Set<DataSegment> allSegments = new HashSet<>(segments);
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
@@ -151,7 +189,6 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
}
}
- final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
@@ -271,6 +308,7 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
", segments=" + segments +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
+ ", dataSource=" + dataSource +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index a9153b8..3d802ed 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -387,6 +387,8 @@ public class TaskQueue
* Shuts down a task if it has not yet finished.
*
* @param taskId task to kill
+ * @param reasonFormat A format string indicating the shutdown reason
+ * @param args arguments for reasonFormat
*/
public void shutdown(final String taskId, String reasonFormat, Object...
args)
{
@@ -407,6 +409,31 @@ public class TaskQueue
}
/**
+ * Shuts down a task, but records the task status as a success, unike {@link
#shutdown(String, String, Object...)}
+ *
+ * @param taskId task to shutdown
+ * @param reasonFormat A format string indicating the shutdown reason
+ * @param args arguments for reasonFormat
+ */
+ public void shutdownWithSuccess(final String taskId, String reasonFormat,
Object... args)
+ {
+ giant.lock();
+
+ try {
+ Preconditions.checkNotNull(taskId, "taskId");
+ for (final Task task : tasks) {
+ if (task.getId().equals(taskId)) {
+ notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
+ break;
+ }
+ }
+ }
+ finally {
+ giant.unlock();
+ }
+ }
+
+ /**
* Notify this queue that some task has an updated status. If this update is
valid, the status will be persisted in
* the task storage facility. If the status is a completed status, the task
will be unlocked and no further
* updates will be accepted.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 2b23706..4b0265f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -27,11 +27,16 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskToolbox;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,6 +47,8 @@ import java.util.function.BiFunction;
public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
{
+ private static final EmittingLogger log = new
EmittingLogger(SequenceMetadata.class);
+
private final int sequenceId;
private final String sequenceName;
private final Set<PartitionIdType> exclusiveStartPartitions;
@@ -301,7 +308,38 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
boolean useTransaction
)
{
- return (mustBeNullOrEmptySegments, segmentsToPush, commitMetadata) -> {
+ return new SequenceMetadataTransactionalSegmentPublisher(
+ runner,
+ toolbox,
+ useTransaction
+ );
+ }
+
+ private class SequenceMetadataTransactionalSegmentPublisher
+ implements TransactionalSegmentPublisher
+ {
+ private final SeekableStreamIndexTaskRunner<PartitionIdType,
SequenceOffsetType> runner;
+ private final TaskToolbox toolbox;
+ private final boolean useTransaction;
+
+ public SequenceMetadataTransactionalSegmentPublisher(
+ SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>
runner,
+ TaskToolbox toolbox,
+ boolean useTransaction
+ )
+ {
+ this.runner = runner;
+ this.toolbox = toolbox;
+ this.useTransaction = useTransaction;
+ }
+
+ @Override
+ public SegmentPublishResult publishAnnotatedSegments(
+ @Nullable Set<DataSegment> mustBeNullOrEmptySegments,
+ Set<DataSegment> segmentsToPush,
+ @Nullable Object commitMetadata
+ ) throws IOException
+ {
if (mustBeNullOrEmptySegments != null &&
!mustBeNullOrEmptySegments.isEmpty()) {
throw new ISE("WTH? stream ingestion tasks are overwriting
segments[%s]", mustBeNullOrEmptySegments);
}
@@ -316,14 +354,43 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
if
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
throw new ISE(
"WTF?! Driver for sequence [%s], attempted to publish invalid
metadata[%s].",
- toString(),
+ SequenceMetadata.this.toString(),
commitMetadata
);
}
final SegmentTransactionalInsertAction action;
- if (useTransaction) {
+ if (segmentsToPush.isEmpty()) {
+ // If a task ingested no data but made progress reading through its
assigned partitions,
+ // we publish no segments but still need to update the supervisor with
the current offsets
+ SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
startPartitions =
+ new SeekableStreamStartSequenceNumbers<>(
+ finalPartitions.getStream(),
+ getStartOffsets(),
+ exclusiveStartPartitions
+ );
+ if (isMetadataUnchanged(startPartitions, finalPartitions)) {
+ // if we created no segments and didn't change any offsets, just do
nothing and return.
+ log.info(
+ "With empty segment set, start offsets [%s] and end offsets [%s]
are the same, skipping metadata commit.",
+ startPartitions,
+ finalPartitions
+ );
+ return SegmentPublishResult.ok(segmentsToPush);
+ } else {
+ log.info(
+ "With empty segment set, start offsets [%s] and end offsets [%s]
changed, committing new metadata.",
+ startPartitions,
+ finalPartitions
+ );
+ action = SegmentTransactionalInsertAction.commitMetadataOnlyAction(
+ runner.getAppenderator().getDataSource(),
+ runner.createDataSourceMetadata(startPartitions),
+ runner.createDataSourceMetadata(finalPartitions)
+ );
+ }
+ } else if (useTransaction) {
action = SegmentTransactionalInsertAction.appendAction(
segmentsToPush,
runner.createDataSourceMetadata(
@@ -340,6 +407,22 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
}
return toolbox.getTaskActionClient().submit(action);
- };
+ }
+
+ @Override
+ public boolean supportsEmptyPublish()
+ {
+ return true;
+ }
+ }
+
+ private boolean isMetadataUnchanged(
+ SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
startSequenceNumbers,
+ SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
endSequenceNumbers
+ )
+ {
+ Map<PartitionIdType, SequenceOffsetType> startMap =
startSequenceNumbers.getPartitionSequenceNumberMap();
+ Map<PartitionIdType, SequenceOffsetType> endMap =
endSequenceNumbers.getPartitionSequenceNumberMap();
+ return startMap.equals(endMap);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5a3db11..90d95b2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -432,17 +432,18 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Map<{group RandomIdUtils}, List<{pending completion task groups}>>
private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>>
pendingCompletionTaskGroups = new ConcurrentHashMap<>();
- // The starting sequence for a new partition in [partitionGroups] is
initially set to getNotSetMarker(). When a new task group
- // is created and is assigned partitions, if the sequence in
[partitionGroups] is getNotSetMarker() it will take the starting
- // sequence value from the metadata store, and if it can't find it there,
from stream. Once a task begins
- // publishing, the sequence in partitionGroups will be updated to the ending
sequence of the publishing-but-not-yet-
+ // We keep two separate maps for tracking the current state of
partition->task group mappings [partitionGroups] and partition->offset
+ // mappings [partitionOffsets]. The starting offset for a new partition in
[partitionOffsets] is initially set to getNotSetMarker(). When a new task group
+ // is created and is assigned partitions, if the offset for an assigned
partition in [partitionOffsets] is getNotSetMarker() it will take the starting
+ // offset value from the metadata store, and if it can't find it there, from
stream. Once a task begins
+ // publishing, the offset in [partitionOffsets] will be updated to the
ending offset of the publishing-but-not-yet-
// completed task, which will cause the next set of tasks to begin reading
from where the previous task left
- // off. If that previous task now fails, we will set the sequence in
[partitionGroups] back to getNotSetMarker() which will
- // cause successive tasks to again grab their starting sequence from
metadata store. This mechanism allows us to
+ // off. If that previous task now fails, we will set the offset in
[partitionOffsets] back to getNotSetMarker() which will
+ // cause successive tasks to again grab their starting offset from metadata
store. This mechanism allows us to
// start up successive tasks without waiting for the previous tasks to
succeed and still be able to handle task
// failures during publishing.
- // Map<{group RandomIdUtils}, Map<{partition RandomIdUtils},
{startingOffset}>>
- protected final ConcurrentHashMap<Integer,
ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> partitionGroups = new
ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType>
partitionOffsets = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<Integer, Set<PartitionIdType>>
partitionGroups = new ConcurrentHashMap<>();
protected final ObjectMapper sortingMapper;
protected final List<PartitionIdType> partitionIds = new
CopyOnWriteArrayList<>();
@@ -801,7 +802,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
boolean includeOffsets
)
{
- int numPartitions =
partitionGroups.values().stream().mapToInt(Map::size).sum();
+ int numPartitions =
partitionGroups.values().stream().mapToInt(Set::size).sum();
final SeekableStreamSupervisorReportPayload<PartitionIdType,
SequenceOffsetType> payload = createReportPayload(
numPartitions,
@@ -1128,6 +1129,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
));
activelyReadingTaskGroups.clear();
partitionGroups.clear();
+ partitionOffsets.clear();
} else {
if (!checkSourceMetadataMatch(dataSourceMetadata)) {
throw new IAE(
@@ -1211,8 +1213,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
activelyReadingTaskGroups.remove(groupId);
// killTaskGroupForPartitions() cleans up
partitionGroups.
// Add the removed groups back.
- partitionGroups.computeIfAbsent(groupId, k -> new
ConcurrentHashMap<>())
- .put(partition, getNotSetMarker());
+ partitionGroups.computeIfAbsent(groupId, k -> new
HashSet<>());
+ partitionOffsets.put(partition, getNotSetMarker());
});
} else {
throw new ISE("Unable to reset metadata");
@@ -1237,6 +1239,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ private void killTaskWithSuccess(final String id, String reasonFormat,
Object... args)
+ {
+ Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+ if (taskQueue.isPresent()) {
+ taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+ } else {
+ log.error("Failed to get task queue because I'm not the leader!");
+ }
+ }
+
private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat,
Object... args)
{
if (taskGroup != null) {
@@ -1287,6 +1299,31 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
seekableStreamIndexTask = (SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType>) task;
final String taskId = task.getId();
+ // Check if the task has any inactive partitions. If so, terminate the
task. Even if some of the
+ // partitions assigned to the task are still active, we still terminate
the task. We terminate such tasks early
+ // to more rapidly ensure that all active partitions are evenly
distributed and being read, and to avoid
+ // having to map expired partitions which are no longer tracked in
partitionIds to a task group.
+ if (supportsPartitionExpiration()) {
+ Set<PartitionIdType> taskPartitions =
seekableStreamIndexTask.getIOConfig()
+
.getStartSequenceNumbers()
+
.getPartitionSequenceNumberMap()
+ .keySet();
+ Set<PartitionIdType> inactivePartitionsInTask = Sets.difference(
+ taskPartitions,
+ new HashSet<>(partitionIds)
+ );
+ if (!inactivePartitionsInTask.isEmpty()) {
+ killTaskWithSuccess(
+ taskId,
+ "Task [%s] with partition set [%s] has inactive partitions [%s],
stopping task.",
+ taskId,
+ taskPartitions,
+ inactivePartitionsInTask
+ );
+ continue;
+ }
+ }
+
// Determine which task group this task belongs to based on one of the
partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure
that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the
same value for every partition being read
@@ -1338,24 +1375,40 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Map<PartitionIdType, SequenceOffsetType>
publishingTaskEndOffsets = taskClient.getEndOffsets(
taskId);
+ // If we received invalid endOffset values, we clear
the known offset to refetch the last committed offset
+ // from metadata. If any endOffset values are
invalid, we treat the entire set as invalid as a safety measure.
+ boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType>
entry : publishingTaskEndOffsets.entrySet()) {
PartitionIdType partition = entry.getKey();
SequenceOffsetType sequence = entry.getValue();
- ConcurrentHashMap<PartitionIdType,
SequenceOffsetType> partitionOffsets = partitionGroups.get(
- getTaskGroupIdForPartition(partition)
- );
+ if (sequence.equals(getEndOfPartitionMarker())) {
+ log.info(
+ "Got end of partition marker for partition
[%s] from task [%s] in discoverTasks, clearing partition offset to refetch from
metadata..",
+ taskId,
+ partition
+ );
+ endOffsetsAreInvalid = true;
+ partitionOffsets.put(partition,
getNotSetMarker());
+ }
+ }
- boolean succeeded;
- do {
- succeeded = true;
- SequenceOffsetType previousOffset =
partitionOffsets.putIfAbsent(partition, sequence);
- if (previousOffset != null
- && (makeSequenceNumber(previousOffset)
- .compareTo(makeSequenceNumber(
- sequence))) < 0) {
- succeeded =
partitionOffsets.replace(partition, previousOffset, sequence);
- }
- } while (!succeeded);
+ if (!endOffsetsAreInvalid) {
+ for (Entry<PartitionIdType, SequenceOffsetType>
entry : publishingTaskEndOffsets.entrySet()) {
+ PartitionIdType partition = entry.getKey();
+ SequenceOffsetType sequence = entry.getValue();
+
+ boolean succeeded;
+ do {
+ succeeded = true;
+ SequenceOffsetType previousOffset =
partitionOffsets.putIfAbsent(partition, sequence);
+ if (previousOffset != null
+ && (makeSequenceNumber(previousOffset)
+ .compareTo(makeSequenceNumber(
+ sequence))) < 0) {
+ succeeded =
partitionOffsets.replace(partition, previousOffset, sequence);
+ }
+ } while (!succeeded);
+ }
}
} else {
for (PartitionIdType partition :
seekableStreamIndexTask.getIOConfig()
@@ -1584,6 +1637,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>>
latestCheckpoints = new TreeMap<>(
taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
);
+
log.info("Setting taskGroup sequences to [%s] for group [%d]",
latestCheckpoints, groupId);
taskGroup.checkpointSequences.clear();
taskGroup.checkpointSequences.putAll(latestCheckpoints);
@@ -1622,7 +1676,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// clear state about the taskgroup so that get latest sequence
information is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left
the group", groupId);
activelyReadingTaskGroups.remove(groupId);
- partitionGroups.get(groupId).replaceAll((partition, sequence) ->
getNotSetMarker());
+ for (PartitionIdType partitionId : taskGroup.startingSequences.keySet())
{
+ partitionOffsets.put(partitionId, getNotSetMarker());
+ }
}
taskSequences.stream().filter(taskIdSequences ->
tasksToKill.contains(taskIdSequences.lhs)).forEach(
@@ -1863,6 +1919,18 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
previouslyExpiredPartitions
);
+ Set<PartitionIdType> activePartitionsIdsFromSupplier = Sets.difference(
+ partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions,
+ closedPartitions
+ );
+
+ Set<PartitionIdType> newlyClosedPartitions = Sets.intersection(
+ closedPartitions,
+ new HashSet<>(previousPartitionIds)
+ );
+
+ log.debug("active partitions from supplier: " +
activePartitionsIdsFromSupplier);
+
if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() !=
partitionIdsFromSupplier.size()) {
// this should never happen, but we check for it and exclude the expired
partitions if they somehow reappear
log.warn(
@@ -1871,9 +1939,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionIdsFromSupplier
);
}
- if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() ==
0) {
+ if (activePartitionsIdsFromSupplier.size() == 0) {
String errMsg = StringUtils.format(
- "No partitions found for stream [%s] after removing previously
expired partitions",
+ "No active partitions found for stream [%s] after removing closed
and previously expired partitions",
ioConfig.getStream()
);
stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
@@ -1887,29 +1955,38 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("partition [%s] is closed and has no more data, skipping.",
partitionId);
continue;
}
- if (!initialPartitionDiscovery &&
!this.partitionIds.contains(partitionId)) {
- subsequentlyDiscoveredPartitions.add(partitionId);
+
+ if (!this.partitionIds.contains(partitionId)) {
+ partitionIds.add(partitionId);
+
+ if (!initialPartitionDiscovery) {
+ subsequentlyDiscoveredPartitions.add(partitionId);
+ }
}
}
-
- // When partitions expire, we need to recompute the task group
assignments, considering only non-expired partitions,
- // to ensure that we have even distribution of readable partitions across
tasks.
+ // When partitions expire, we need to recompute the task group
assignments, considering only
+ // non-closed and non-expired partitions, to ensure that we have even
distribution of active
+ // partitions across tasks.
if (supportsPartitionExpiration()) {
- cleanupExpiredPartitions(
+ cleanupClosedAndExpiredPartitions(
storedPartitions,
+ newlyClosedPartitions,
+ activePartitionsIdsFromSupplier,
previouslyExpiredPartitions,
partitionIdsFromSupplier
);
}
- for (PartitionIdType partitionId :
partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
+ for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) {
int taskGroupId = getTaskGroupIdForPartition(partitionId);
- ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionMap =
partitionGroups.computeIfAbsent(
+ Set<PartitionIdType> partitionGroup = partitionGroups.computeIfAbsent(
taskGroupId,
- k -> new ConcurrentHashMap<>()
+ k -> new HashSet<>()
);
- if (partitionMap.putIfAbsent(partitionId, getNotSetMarker()) == null) {
+ partitionGroup.add(partitionId);
+
+ if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) ==
null) {
log.info(
"New partition [%s] discovered for stream [%s], added to task
group [%d]",
partitionId,
@@ -1952,16 +2029,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* <p>
* It will mark the expired partitions in metadata and recompute the
partition->task group mappings, updating
* the metadata, the partitionIds list, and the partitionGroups mappings.
- * <p>
- * Note that partition IDs that were newly discovered (appears in record
supplier set but not in metadata set)
- * are not added to the recomputed partition groups here. This is handled
later in
- * {@link #updatePartitionDataFromStream} after this method is called.
*
- * @param storedPartitions Set of partitions previously tracked,
from the metadata store
+ * @param storedPartitions Set of partitions previously tracked, from the
metadata store
+ * @param newlyClosedPartitions Set of partitions that are closed in the
metadata store but still present in the
+ * current {@link #partitionIds}
+ * @param activePartitionsIdsFromSupplier Set of partitions currently
returned by the record supplier, but with
+ * any partitions that are
closed/expired in the metadata store removed
+ * @param previouslyExpiredPartitions Set of partitions that are recorded as
expired in the metadata store
* @param partitionIdsFromSupplier Set of partitions currently returned by
the record supplier.
*/
- private void cleanupExpiredPartitions(
+ private void cleanupClosedAndExpiredPartitions(
Set<PartitionIdType> storedPartitions,
+ Set<PartitionIdType> newlyClosedPartitions,
+ Set<PartitionIdType> activePartitionsIdsFromSupplier,
Set<PartitionIdType> previouslyExpiredPartitions,
Set<PartitionIdType> partitionIdsFromSupplier
)
@@ -1971,7 +2051,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Set<PartitionIdType> newlyExpiredPartitions =
Sets.difference(storedPartitions, previouslyExpiredPartitions);
newlyExpiredPartitions = Sets.difference(newlyExpiredPartitions,
partitionIdsFromSupplier);
- if (newlyExpiredPartitions.size() > 0) {
+ if (!newlyExpiredPartitions.isEmpty()) {
log.info("Detected newly expired partitions: " + newlyExpiredPartitions);
// Mark partitions as expired in metadata
@@ -1987,28 +2067,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
validateMetadataPartitionExpiration(newlyExpiredPartitions,
currentMetadata, cleanedMetadata);
- // Compute new partition groups, only including partitions that are
- // still in partitionIdsFromSupplier
- Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>>
newPartitionGroups =
- recomputePartitionGroupsForExpiration(partitionIdsFromSupplier);
-
- validatePartitionGroupReassignments(newPartitionGroups);
-
- log.info("New partition groups after partition expiration: " +
newPartitionGroups);
-
try {
boolean success =
indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource,
cleanedMetadata);
if (success) {
- partitionIds.clear();
- partitionIds.addAll(partitionIdsFromSupplier);
- for (Integer groupId : partitionGroups.keySet()) {
- if (newPartitionGroups.containsKey(groupId)) {
- partitionGroups.put(groupId, newPartitionGroups.get(groupId));
- } else {
- partitionGroups.put(groupId, new ConcurrentHashMap<>());
- }
- }
} else {
log.error("Failed to update datasource metadata[%s] with expired
partitions removed", cleanedMetadata);
}
@@ -2017,6 +2079,33 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
throw new RuntimeException(ioe);
}
}
+
+ if (!newlyClosedPartitions.isEmpty()) {
+ log.info("Detected newly closed partitions: " + newlyClosedPartitions);
+ }
+
+ // Partitions have been dropped
+ if (!newlyClosedPartitions.isEmpty() || !newlyExpiredPartitions.isEmpty())
{
+ // Compute new partition groups, only including partitions that are
+ // still in partitionIdsFromSupplier and not closed
+ Map<Integer, Set<PartitionIdType>> newPartitionGroups =
+
recomputePartitionGroupsForExpiration(activePartitionsIdsFromSupplier);
+
+ validatePartitionGroupReassignments(activePartitionsIdsFromSupplier,
newPartitionGroups);
+
+ log.info("New partition groups after removing closed and expired
partitions: " + newPartitionGroups);
+
+ partitionIds.clear();
+ partitionIds.addAll(activePartitionsIdsFromSupplier);
+
+ for (Integer groupId : partitionGroups.keySet()) {
+ if (newPartitionGroups.containsKey(groupId)) {
+ partitionGroups.put(groupId, newPartitionGroups.get(groupId));
+ } else {
+ partitionGroups.put(groupId, new HashSet<>());
+ }
+ }
+ }
}
/**
@@ -2032,7 +2121,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* @param availablePartitions
* @return a remapped copy of partitionGroups, containing only the
partitions in availablePartitions
*/
- protected Map<Integer, ConcurrentHashMap<PartitionIdType,
SequenceOffsetType>> recomputePartitionGroupsForExpiration(
+ protected Map<Integer, Set<PartitionIdType>>
recomputePartitionGroupsForExpiration(
Set<PartitionIdType> availablePartitions
)
{
@@ -2056,6 +2145,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
throw new UnsupportedOperationException("This supervisor type does not
support partition expiration.");
}
+ protected SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> createDataSourceMetadataWithClosedPartitions(
+ SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>
currentMetadata,
+ Set<PartitionIdType> closedPartitionIds
+ )
+ {
+ throw new UnsupportedOperationException("This supervisor type does not
support partition closing.");
+ }
+
/**
* Perform a sanity check on the datasource metadata returned by
* {@link #createDataSourceMetadataWithExpiredPartitions}.
@@ -2123,33 +2220,18 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* @param newPartitionGroups new metadata without expired partitions,
generated by the subclass
*/
private void validatePartitionGroupReassignments(
- Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>>
newPartitionGroups
+ Set<PartitionIdType> activePartitionsIdsFromSupplier,
+ Map<Integer, Set<PartitionIdType>> newPartitionGroups
)
{
- Map<PartitionIdType, SequenceOffsetType> oldPartitionMappings = new
HashMap<>();
- for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> oldGroup :
partitionGroups.values()) {
- // we don't care about old task group mappings, only the
partition-offset mappings
- oldPartitionMappings.putAll(oldGroup);
- }
-
- for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> newGroup :
newPartitionGroups.values()) {
- for (Entry<PartitionIdType, SequenceOffsetType> newPartitionMapping :
newGroup.entrySet()) {
- if (!oldPartitionMappings.containsKey(newPartitionMapping.getKey())) {
+ for (Set<PartitionIdType> newGroup : newPartitionGroups.values()) {
+ for (PartitionIdType partitionInNewGroup : newGroup) {
+ if (!activePartitionsIdsFromSupplier.contains(partitionInNewGroup)) {
// recomputing the groups without the expired partitions added an
unknown partition somehow
throw new IAE(
"Recomputed partition groups [%s] contains unexpected partition
ID [%s], old partition groups: [%s]",
newPartitionGroups,
- newPartitionMapping.getKey(),
- partitionGroups
- );
- }
-
- SequenceOffsetType oldOffset =
oldPartitionMappings.get(newPartitionMapping.getKey());
- if (!oldOffset.equals(newPartitionMapping.getValue())) {
- throw new IAE(
- "Recomputed partition groups [%s] has offset mismatch for
partition ID [%s], original partition map: [%s]",
- newPartitionGroups,
- newPartitionMapping.getKey(),
+ partitionInNewGroup,
partitionGroups
);
}
@@ -2271,9 +2353,29 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
group.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
- // set endOffsets as the next startOffsets
+
+ boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
- partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
+ if (entry.getValue().equals(getEndOfPartitionMarker())) {
+ log.info(
+ "Got end of partition marker for partition [%s] in
checkTaskDuration, not updating partition offset.",
+ entry.getKey()
+ );
+ endOffsetsAreInvalid = true;
+ }
+ }
+
+ // set endOffsets as the next startOffsets
+ // If we received invalid endOffset values, we clear the known offset
to refetch the last committed offset
+ // from metadata. If any endOffset values are invalid, we treat the
entire set as invalid as a safety measure.
+ if (!endOffsetsAreInvalid) {
+ for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
+ partitionOffsets.put(entry.getKey(), entry.getValue());
+ }
+ } else {
+ for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
+ partitionOffsets.put(entry.getKey(), getNotSetMarker());
+ }
}
} else {
for (String id : group.taskIds()) {
@@ -2285,7 +2387,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
// clear partitionGroups, so that latest sequences from db is used as
start sequences not the stale ones
// if tasks did some successful incremental handoffs
- partitionGroups.get(groupId).replaceAll((partition, sequence) ->
getNotSetMarker());
+ for (PartitionIdType partitionId : group.startingSequences.keySet()) {
+ partitionOffsets.put(partitionId, getNotSetMarker());
+ }
}
// remove this task group from the list of current task groups now that
it has been handled
@@ -2560,7 +2664,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
// reset partitions sequences for this task group so that they will
be re-read from metadata storage
- partitionGroups.get(groupId).replaceAll((partition, sequence) ->
getNotSetMarker());
+ for (PartitionIdType partitionId : group.startingSequences.keySet())
{
+ partitionOffsets.put(partitionId, getNotSetMarker());
+ }
+
// kill all the tasks in this pending completion group
killTasksInGroup(
group,
@@ -2671,7 +2778,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// check that there is a current task group for each group of partitions
in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) {
- log.info("Creating new task group [%d] for partitions %s", groupId,
partitionGroups.get(groupId).keySet());
+ log.info("Creating new task group [%d] for partitions %s", groupId,
partitionGroups.get(groupId));
Optional<DateTime> minimumMessageTime;
if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
minimumMessageTime =
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
@@ -2801,22 +2908,21 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
)
{
ImmutableMap.Builder<PartitionIdType,
OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
- for (Entry<PartitionIdType, SequenceOffsetType> entry :
partitionGroups.get(groupId).entrySet()) {
- PartitionIdType partition = entry.getKey();
- SequenceOffsetType sequence = entry.getValue();
+ for (PartitionIdType partitionId : partitionGroups.get(groupId)) {
+ SequenceOffsetType sequence = partitionOffsets.get(partitionId);
if (!getNotSetMarker().equals(sequence)) {
// if we are given a startingOffset (set by a previous task group
which is pending completion) then use it
if (!isEndOfShard(sequence)) {
- builder.put(partition, makeSequenceNumber(sequence,
useExclusiveStartSequenceNumberForNonFirstSequence()));
+ builder.put(partitionId, makeSequenceNumber(sequence,
useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
// if we don't have a startingOffset (first run or we had some
previous failures and reset the sequences) then
// get the sequence from metadata storage (if available) or
Kafka/Kinesis (otherwise)
- OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage =
getOffsetFromStorageForPartition(partition);
+ OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage =
getOffsetFromStorageForPartition(partitionId);
if (offsetFromStorage != null) {
- builder.put(partition, offsetFromStorage);
+ builder.put(partitionId, offsetFromStorage);
}
}
}
@@ -3131,7 +3237,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@VisibleForTesting
- public ConcurrentHashMap<Integer, ConcurrentHashMap<PartitionIdType,
SequenceOffsetType>> getPartitionGroups()
+ public ConcurrentHashMap<Integer, Set<PartitionIdType>> getPartitionGroups()
{
return partitionGroups;
}
@@ -3142,6 +3248,23 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return this.partitionIds.isEmpty();
}
+ public ConcurrentHashMap<PartitionIdType, SequenceOffsetType>
getPartitionOffsets()
+ {
+ return partitionOffsets;
+ }
+
+ /**
+ * Should never be called outside of tests.
+ */
+ @VisibleForTesting
+ public void setPartitionIdsForTests(
+ List<PartitionIdType> partitionIdsForTests
+ )
+ {
+ partitionIds.clear();
+ partitionIds.addAll(partitionIdsForTests);
+ }
+
/**
* creates a specific task IOConfig instance for Kafka/Kinesis
*
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index d270ea8..0a370d5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -119,6 +119,16 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
@Override
+ public SegmentPublishResult commitMetadataOnly(
+ String dataSource,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ throw new UnsupportedOperationException("Not implemented, no test uses
this currently.");
+ }
+
+ @Override
public SegmentIdWithShardSpec allocatePendingSegment(
String dataSource,
String sequenceName,
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 97205a8..356223a 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -180,6 +180,33 @@ public interface IndexerMetadataStorageCoordinator
) throws IOException;
/**
+ * Similar to {@link #announceHistoricalSegments(Set)}, but meant for
streaming ingestion tasks for handling
+ * the case where the task ingested no records and created no segments, but
still needs to update the metadata
+ * with the progress that the task made.
+ *
+ * The metadata should undergo the same validation checks as performed by
announceHistoricalSegments.
+ *
+ *
+ * @param dataSource the datasource
+ * @param startMetadata dataSource metadata pre-insert must match this
startMetadata according to
+ * {@link
DataSourceMetadata#matches(DataSourceMetadata)}.
+ * @param endMetadata dataSource metadata post-insert will have this
endMetadata merged in with
+ * {@link DataSourceMetadata#plus(DataSourceMetadata)}.
+ *
+ * @return segment publish result indicating transaction success or failure.
+ * This method must only return a failure code if it is sure that the
transaction did not happen. If it is not sure,
+ * it must throw an exception instead.
+ *
+ * @throws IllegalArgumentException if either startMetadata and endMetadata
are null
+ * @throws RuntimeException if the state of metadata storage after
this call is unknown
+ */
+ SegmentPublishResult commitMetadataOnly(
+ String dataSource,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ );
+
+ /**
* Read dataSource metadata. Returns null if there is no metadata.
*/
DataSourceMetadata getDataSourceMetadata(String dataSource);
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index dbb690e..da8a783 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -354,6 +354,74 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
@Override
+ public SegmentPublishResult commitMetadataOnly(
+ String dataSource,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ if (dataSource == null) {
+ throw new IllegalArgumentException("datasource name cannot be null");
+ }
+ if (startMetadata == null) {
+ throw new IllegalArgumentException("start metadata cannot be null");
+ }
+ if (endMetadata == null) {
+ throw new IllegalArgumentException("end metadata cannot be null");
+ }
+
+ final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
+
+ try {
+ return connector.retryTransaction(
+ new TransactionCallback<SegmentPublishResult>()
+ {
+ @Override
+ public SegmentPublishResult inTransaction(
+ final Handle handle,
+ final TransactionStatus transactionStatus
+ ) throws Exception
+ {
+ // Set definitelyNotUpdated back to false upon retrying.
+ definitelyNotUpdated.set(false);
+
+ final DataSourceMetadataUpdateResult result =
updateDataSourceMetadataWithHandle(
+ handle,
+ dataSource,
+ startMetadata,
+ endMetadata
+ );
+
+ if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+ // Metadata was definitely not updated.
+ transactionStatus.setRollbackOnly();
+ definitelyNotUpdated.set(true);
+
+ if (result == DataSourceMetadataUpdateResult.FAILURE) {
+ throw new RuntimeException("Aborting transaction!");
+ } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN)
{
+ throw new RetryTransactionException("Aborting transaction!");
+ }
+ }
+
+ return SegmentPublishResult.ok(ImmutableSet.of());
+ }
+ },
+ 3,
+ SQLMetadataConnector.DEFAULT_MAX_TRIES
+ );
+ }
+ catch (CallbackFailedException e) {
+ if (definitelyNotUpdated.get()) {
+ return SegmentPublishResult.fail(e.getMessage());
+ } else {
+ // Must throw exception if we are not sure if we updated or not.
+ throw e;
+ }
+ }
+ }
+
+ @Override
public SegmentIdWithShardSpec allocatePendingSegment(
final String dataSource,
final String sequenceName,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 0fdac39..eeb3269 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -551,10 +551,22 @@ public abstract class BaseAppenderatorDriver implements
Closeable
)
{
if (segmentsAndMetadata.getSegments().isEmpty()) {
- log.debug("Nothing to publish, skipping publish step.");
- final SettableFuture<SegmentsAndMetadata> retVal =
SettableFuture.create();
- retVal.set(segmentsAndMetadata);
- return retVal;
+ if (!publisher.supportsEmptyPublish()) {
+ log.info("Nothing to publish, skipping publish step.");
+ final SettableFuture<SegmentsAndMetadata> retVal =
SettableFuture.create();
+ retVal.set(segmentsAndMetadata);
+ return retVal;
+ } else {
+ // Sanity check: if we have no segments to publish, but the
appenderator did ingest > 0 valid rows,
+ // something is wrong. This check could be expanded to cover
publishers that return false for
+ // supportsEmptyPublish, but is kept limited for now until further
testing.
+ if (appenderator.getTotalRowCount() != 0) {
+ throw new ISE(
+ "Attempting to publish with empty segment set, but total row
count was not 0: [%s].",
+ appenderator.getTotalRowCount()
+ );
+ }
+ }
}
final Object metadata = segmentsAndMetadata.getCommitMetadata();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 8ebc662..cb9b9ff 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -68,6 +68,15 @@ public interface TransactionalSegmentPublisher
);
}
+ /**
+ * @return true if this publisher has action to take when publishing with an
empty segment set.
+ * The publisher used by the seekable stream tasks is an example
where this is true.
+ */
+ default boolean supportsEmptyPublish()
+ {
+ return false;
+ }
+
static Set<DataSegment> annotateAtomicUpdateGroupSize(Set<DataSegment>
segments)
{
final Map<Interval, List<DataSegment>> intervalToSegments = new
HashMap<>();
diff --git a/website/.spelling b/website/.spelling
index a38cf47..0be8bb4 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -670,6 +670,8 @@ taskDuration
9.2dist
KinesisSupervisorIOConfig
KinesisSupervisorTuningConfig
+Resharding
+resharding
LZ4LZFuncompressedLZ4LZ4LZFuncompressednoneLZ4autolongsautolongslongstypeconcisetyperoaringcompressRunOnSerializationtruetypestreamendpointreplicastaskCounttaskCount
deaggregate
druid-kinesis-indexing-service
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]