This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2355a60 Avoid primary key violation in segment tables under certain
conditions when appending data to same interval (#11714)
2355a60 is described below
commit 2355a60419fe423faae9af7d95b97199b11309d7
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Wed Sep 22 17:21:48 2021 -0700
Avoid primary key violation in segment tables under certain conditions when
appending data to same interval (#11714)
* Fix issue of duplicate key under certain conditions when loading late
data in streaming. Also fixes a documentation issue with
skipSegmentLineageCheck.
* maxId may be null at this point, need to check for that
* Remove hypothetical case (it cannot happen)
* Revert compaction is simply "killing" the compacted segment and
previously, used, overshadowed segments are visible again
* Add comments
---
.../IndexerSQLMetadataStorageCoordinator.java | 85 +++--
.../appenderator/BaseAppenderatorDriver.java | 2 +-
.../realtime/appenderator/SegmentAllocator.java | 2 +-
.../appenderator/StreamAppenderatorDriver.java | 6 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 407 ++++++++++++++++++++-
5 files changed, 476 insertions(+), 26 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 4887c90..c5081f9 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -253,13 +253,13 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return numSegmentsMarkedUnused;
}
- private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
+ private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{
- final List<SegmentIdWithShardSpec> identifiers = new ArrayList<>();
+ final Set<SegmentIdWithShardSpec> identifiers = new HashSet<>();
final ResultIterator<byte[]> dbSegments =
handle.createQuery(
@@ -843,15 +843,30 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.execute();
}
+ /**
+ * This function creates a new segment for the given
datasource/interval/etc. A critical
+ * aspect of the creation is to make sure that the new version & new
partition number will make
+ * sense given the existing segments & pending segments also very important
is to avoid
+ * clashes with existing pending & used/unused segments.
+ * @param handle Database handle
+ * @param dataSource datasource for the new segment
+ * @param interval interval for the new segment
+ * @param partialShardSpec Shard spec info minus segment id stuff
+ * @param existingVersion Version of segments in interval, used to compute
the version of the very first segment in
+ * interval
+ * @return
+ * @throws IOException
+ */
@Nullable
private SegmentIdWithShardSpec createNewSegment(
final Handle handle,
final String dataSource,
final Interval interval,
final PartialShardSpec partialShardSpec,
- final String maxVersion
+ final String existingVersion
) throws IOException
{
+ // Get the time chunk and associated data segments for the given interval,
if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(
handle,
dataSource,
@@ -884,66 +899,94 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// See PartitionIds.
.filter(segment ->
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
// Don't use the stream API for performance.
+ // Note that this will compute the max id of existing, visible, data
segments in the time chunk:
if (maxId == null || maxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
- final List<SegmentIdWithShardSpec> pendings =
getPendingSegmentsForIntervalWithHandle(
+ // Get the version of the existing chunk, we might need it in some of
the cases below
+ // to compute the new identifier's version
+ @Nullable
+ final String versionOfExistingChunk;
+ if (!existingChunks.isEmpty()) {
+ // remember only one chunk possible for given interval so get the
first & only one
+ versionOfExistingChunk = existingChunks.get(0).getVersion();
+ } else {
+ versionOfExistingChunk = null;
+ }
+
+ // next, we need to enrich the maxId computed before with the
information of the pending segments
+ // it is possible that a pending segment has a higher id in which case
we need that, it will work,
+ // and it will avoid clashes when inserting the new pending segment
later in the caller of this method
+ final Set<SegmentIdWithShardSpec> pendings =
getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
-
+ // Make sure we add the maxId we obtained from the segments table:
if (maxId != null) {
pendings.add(maxId);
}
-
+ // Now compute the maxId with all the information: pendings + segments:
+ // The versionOfExistingChunks filter is ensure that we pick the max id
with the version of the existing chunk
+ // in the case that there may be a pending segment with a higher version
but no corresponding used segments
+ // which may generate a clash with an existing segment once the new id
is generated
maxId = pendings.stream()
.filter(id ->
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+ .filter(id -> versionOfExistingChunk == null ? true :
id.getVersion().equals(versionOfExistingChunk))
.max((id1, id2) -> {
final int versionCompare =
id1.getVersion().compareTo(id2.getVersion());
if (versionCompare != 0) {
return versionCompare;
} else {
- return
Integer.compare(id1.getShardSpec().getPartitionNum(),
id2.getShardSpec().getPartitionNum());
+ return Integer.compare(
+ id1.getShardSpec().getPartitionNum(),
+ id2.getShardSpec().getPartitionNum()
+ );
}
})
.orElse(null);
- // Find the major version of existing segments
- @Nullable final String versionOfExistingChunks;
- if (!existingChunks.isEmpty()) {
- versionOfExistingChunks = existingChunks.get(0).getVersion();
- } else if (!pendings.isEmpty()) {
- versionOfExistingChunks = pendings.get(0).getVersion();
+ // The following code attempts to compute the new version, if this
+ // new version is not null at the end of next block then it will be
+ // used as the new version in the case for initial or appended segment
+ final String newSegmentVersion;
+ if (versionOfExistingChunk != null) {
+ // segment version overrides, so pick that now that we know it exists
+ newSegmentVersion = versionOfExistingChunk;
+ } else if (!pendings.isEmpty() && maxId != null) {
+ // there is no visible segments in the time chunk, so pick the maxId
of pendings, as computed above
+ newSegmentVersion = maxId.getVersion();
} else {
- versionOfExistingChunks = null;
+ // no segments, no pendings, so this must be the very first segment
created for this interval
+ newSegmentVersion = null;
}
if (maxId == null) {
+ // When appending segments, null maxId means that we are allocating
the very initial
+ // segment for this time chunk.
// This code is executed when the Overlord coordinates segment
allocation, which is either you append segments
- // or you use segment lock. When appending segments, null maxId means
that we are allocating the very initial
- // segment for this time chunk. Since the core partitions set is not
determined for appended segments, we set
+ // or you use segment lock. Since the core partitions set is not
determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't
work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId =
partialShardSpec.useNonRootGenerationPartitionSpace()
?
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
- String version = versionOfExistingChunks == null ? maxVersion :
versionOfExistingChunks;
+ String version = newSegmentVersion == null ? existingVersion :
newSegmentVersion;
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
- } else if (!maxId.getInterval().equals(interval) ||
maxId.getVersion().compareTo(maxVersion) > 0) {
+ } else if (!maxId.getInterval().equals(interval) ||
maxId.getVersion().compareTo(existingVersion) > 0) {
log.warn(
- "Cannot allocate new segment for dataSource[%s], interval[%s],
maxVersion[%s]: conflicting segment[%s].",
+ "Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
- maxVersion,
+ existingVersion,
maxId
);
return null;
@@ -958,7 +1001,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return new SegmentIdWithShardSpec(
dataSource,
maxId.getInterval(),
- Preconditions.checkNotNull(versionOfExistingChunks,
"versionOfExistingChunks"),
+ Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
partialShardSpec.complete(
jsonMapper,
maxId.getShardSpec().getPartitionNum() + 1,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index bdd572c..9deb657 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -383,7 +383,7 @@ public abstract class BaseAppenderatorDriver implements
Closeable
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with
all data that has been added, including this row
* if {@param allowIncrementalPersists} is
set to false then this will not be used
- * @param skipSegmentLineageCheck if true, perform lineage validation using
previousSegmentId for this sequence.
+ * @param skipSegmentLineageCheck if false, perform lineage validation
using previousSegmentId for this sequence.
* Should be set to false if replica tasks
would index events in same order
* @param allowIncrementalPersists whether to allow persist to happen when
maxRowsInMemory or intermediate persist period
* threshold is hit
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
index 1bffd81..644efa5 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
@@ -37,7 +37,7 @@ public interface SegmentAllocator
* When skipSegmentLineageCheck is false,
this can be null if it is the first call
* for the same sequenceName.
* When skipSegmentLineageCheck is true, this
will be ignored.
- * @param skipSegmentLineageCheck if true, perform lineage validation using
previousSegmentId for this sequence.
+ * @param skipSegmentLineageCheck if false, perform lineage validation using
previousSegmentId for this sequence.
* Should be set to false if replica tasks
would index events in same order
*
* @return the pending segment identifier, or null if it was impossible to
allocate a new segment
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 53212e2..e4a84cb 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -167,8 +167,10 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with
all data that has been added, including this row
* if {@param allowIncrementalPersists} is
set to false then this will not be used
- * @param skipSegmentLineageCheck if true, perform lineage validation using
previousSegmentId for this sequence.
- * Should be set to false if replica tasks
would index events in same order
+ * @param skipSegmentLineageCheck Should be set {@code false} to perform
lineage validation using previousSegmentId for this sequence.
+ * Note that for Kafka Streams we should
disable this check and set this parameter to
+ * {@code true}.
+ * if {@code true}, skips, does not enforce,
lineage validation.
* @param allowIncrementalPersists whether to allow persist to happen when
maxRowsInMemory or intermediate persist period
* threshold is hit
*
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 135c893..3f9431c 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -275,7 +275,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
private void markAllSegmentsUnused()
{
- for (final DataSegment segment : SEGMENTS) {
+ markAllSegmentsUnused(SEGMENTS);
+ }
+
+ private void markAllSegmentsUnused(Set<DataSegment> segments)
+ {
+ for (final DataSegment segment : segments) {
Assert.assertEquals(
1,
(int) derbyConnector.getDBI().<Integer>withHandle(
@@ -296,6 +301,45 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
}
+ private void markAllSegmentsUsed(Set<DataSegment> segments)
+ {
+ for (final DataSegment segment : segments) {
+ Assert.assertEquals(
+ 1,
+ (int) derbyConnector.getDBI().<Integer>withHandle(
+ new HandleCallback<Integer>()
+ {
+ @Override
+ public Integer withHandle(Handle handle)
+ {
+ String request = StringUtils.format(
+ "UPDATE %s SET used = true WHERE id = :id",
+
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
+ );
+ return handle.createStatement(request).bind("id",
segment.getId().toString()).execute();
+ }
+ }
+ )
+ );
+ }
+ }
+
+ private List<String> retrievePendingSegmentIds()
+ {
+ final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
+ return derbyConnector.retryWithHandle(
+ new HandleCallback<List<String>>()
+ {
+ @Override
+ public List<String> withHandle(Handle handle)
+ {
+ return handle.createQuery("SELECT id FROM " + table + " ORDER BY
id")
+ .map(StringMapper.FIRST)
+ .list();
+ }
+ }
+ );
+ }
private List<String> retrieveUsedSegmentIds()
{
final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
@@ -313,6 +357,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ private List<String> retrieveUnusedSegmentIds()
+ {
+ final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ return derbyConnector.retryWithHandle(
+ new HandleCallback<List<String>>()
+ {
+ @Override
+ public List<String> withHandle(Handle handle)
+ {
+ return handle.createQuery("SELECT id FROM " + table + " WHERE used
= false ORDER BY id")
+ .map(StringMapper.FIRST)
+ .list();
+ }
+ }
+ );
+ }
+
+
private Boolean insertUsedSegments(Set<DataSegment> dataSegments)
{
final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
@@ -1203,6 +1265,349 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3",
identifier4.toString());
}
+ /**
+ * This test simulates an issue detected on the field consisting of the
following sequence of events:
+ * - A kafka stream segment was created on a given interval
+ * - Later, after the above was published, another segment on same interval
was created by the stream
+ * - Later, after the above was published, another segment on same interval
was created by the stream
+ * - Later a compaction was issued for the three segments above
+ * - Later, after the above was published, another segment on same interval
was created by the stream
+ * - Later, the compacted segment got dropped due to a drop rule
+ * - Later, after the above was dropped, another segment on same interval
was created by the stream but this
+ * time there was an integrity violation in the pending segments table
because the
+ * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle,
String, Interval, PartialShardSpec, String)}
+ * method returned an segment id that already existed in the pending
segments table
+ */
+ @Test
+ public void testAllocatePendingSegmentAfterDroppingExistingSegment()
+ {
+ String maxVersion = "version_newer_newer";
+
+ // simulate one load using kafka streaming
+ final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
+ final String dataSource = "ds";
+ final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+ final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq",
+ null,
+ interval,
+ partialShardSpec,
+ "version",
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version",
identifier.toString());
+
+ // simulate one more load using kafka streaming (as if previous segment
was published, note different sequence name)
+ final SegmentIdWithShardSpec identifier1 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq2",
+ identifier.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1",
identifier1.toString());
+
+ // simulate one more load using kafka streaming (as if previous segment
was published, note different sequence name)
+ final SegmentIdWithShardSpec identifier2 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq3",
+ identifier1.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
identifier2.toString());
+
+ // now simulate that one compaction was done (batch) ingestion for same
interval (like reindex of the previous three):
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "version_new",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ List<String> ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new",
ids.get(0));
+
+ // one more load on same interval:
+ final SegmentIdWithShardSpec identifier3 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq4",
+ identifier1.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1",
identifier3.toString());
+
+ // now drop the used segment previously loaded:
+ markAllSegmentsUnused(ImmutableSet.of(segment));
+
+ // and final load, this reproduces an issue that could happen with
multiple streaming appends,
+ // followed by a reindex, followed by a drop, and more streaming data
coming in for same interval
+ final SegmentIdWithShardSpec identifier4 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq5",
+ identifier1.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2",
identifier4.toString());
+
+ }
+
+ /**
+ * Slightly different that the above test but that involves reverted
compaction
+ 1) used segments of version = A, id = 0, 1, 2
+ 2) overwrote segments of version = B, id = 0 <= compaction
+ 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
+ 4) pending segment of version = B, id = 1 <= appending new data, aborted
+ 5) reverted compaction, mark segments used for version = A, id = 0, 1, 2,
and mark compacted segments unused
+ 6) used segments of version = A, id = 0, 1, 2
+ 7) pending segment of version = B, id = 1
+ */
+ @Test
+ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction()
+ {
+ String maxVersion = "Z";
+
+ // 1.0) simulate one append load
+ final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
+ final String dataSource = "ds";
+ final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+ final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq",
+ null,
+ interval,
+ partialShardSpec,
+ "A",
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A",
identifier.toString());
+ // Assume it publishes; create its corresponding segment
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "A",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ List<String> ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A",
ids.get(0));
+
+
+ // 1.1) simulate one more append load (as if previous segment was
published, note different sequence name)
+ final SegmentIdWithShardSpec identifier1 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq2",
+ identifier.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1",
identifier1.toString());
+ // Assume it publishes; create its corresponding segment
+ segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "A",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(1),
+ 9,
+ 100
+ );
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1",
ids.get(1));
+
+
+ // 1.2) simulate one more append load (as if previous segment was
published, note different sequence name)
+ final SegmentIdWithShardSpec identifier2 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq3",
+ identifier1.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2",
identifier2.toString());
+ // Assume it publishes; create its corresponding segment
+ segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "A",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(2),
+ 9,
+ 100
+ );
+ // state so far:
+ // pendings: A: 0,1,2
+ // used segments A: 0,1,2
+ // unused segments:
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2",
ids.get(2));
+
+
+ // 2)
+ // now simulate that one compaction was done (batch) ingestion for same
interval (like reindex of the previous three):
+ DataSegment compactedSegment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "B",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(compactedSegment)));
+ ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B",
ids.get(3));
+ // 3) When overshadowing, segments are still marked as "used" in the
segments table
+ // state so far:
+ // pendings: A: 0,1,2
+ // used segments: A: 0,1,2; B: 0 <- new compacted segment, overshadows
previous version A
+ // unused segment:
+
+ // 4) pending segment of version = B, id = 1 <= appending new data, aborted
+ final SegmentIdWithShardSpec identifier3 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq4",
+ identifier2.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1",
identifier3.toString());
+ // no corresponding segment, pending aborted
+ // state so far:
+ // pendings: A: 0,1,2; B:1 (note that B_1 does not make it into segments
since its task aborted)
+ // used segments: A: 0,1,2; B: 0 <- compacted segment, overshadows
previous version A
+ // unused segment:
+
+ // 5) reverted compaction (by marking B_0 as unused)
+ // Revert compaction a manual metadata update which is basically the
following two steps:
+ markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop
compacted segment
+ // pending: version = A, id = 0,1,2
+ // version = B, id = 1
+ //
+ // used segment: version = A, id = 0,1,2
+ // unused segment: version = B, id = 0
+ List<String> pendings = retrievePendingSegmentIds();
+ Assert.assertTrue(pendings.size() == 4);
+
+ List<String> used = retrieveUsedSegmentIds();
+ Assert.assertTrue(used.size() == 3);
+
+ List<String> unused = retrieveUnusedSegmentIds();
+ Assert.assertTrue(unused.size() == 1);
+
+ // Simulate one more append load
+ final SegmentIdWithShardSpec identifier4 =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq5",
+ identifier1.toString(),
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+ // maxid = B_1 -> new partno = 2
+ // versionofexistingchunk=A
+ // ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3",
identifier4.toString());
+ // Assume it publishes; create its corresponding segment
+ segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "A",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(3),
+ 9,
+ 100
+ );
+ // pending: version = A, id = 0,1,2,3
+ // version = B, id = 1
+ //
+ // used segment: version = A, id = 0,1,2,3
+ // unused segment: version = B, id = 0
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3",
ids.get(3));
+
+ }
+
+ @Test
+ public void testNoPendingSegmentsAndOneUsedSegment()
+ {
+ String maxVersion = "Z";
+
+ // create one used segment
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+ "A",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+ List<String> ids = retrieveUsedSegmentIds();
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A",
ids.get(0));
+
+
+ // simulate one aborted append load
+ final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
+ final String dataSource = "ds";
+ final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+ final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ dataSource,
+ "seq",
+ null,
+ interval,
+ partialShardSpec,
+ maxVersion,
+ true
+ );
+
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1",
identifier.toString());
+
+ }
+
+
+
@Test
public void testDeletePendingSegment() throws InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]