This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2355a60  Avoid primary key violation in segment tables under certain 
conditions when appending data to same interval (#11714)
2355a60 is described below

commit 2355a60419fe423faae9af7d95b97199b11309d7
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Wed Sep 22 17:21:48 2021 -0700

    Avoid primary key violation in segment tables under certain conditions when 
appending data to same interval (#11714)
    
    * Fix issue of duplicate key  under certain conditions when loading late 
data in streaming. Also fixes a documentation issue with 
skipSegmentLineageCheck.
    
    * maxId may be null at this point, need to check for that
    
    * Remove hypothetical case (it cannot happen)
    
    * Revert compaction is simply "killing" the compacted segment and 
previously, used, overshadowed segments are visible again
    
    * Add comments
---
 .../IndexerSQLMetadataStorageCoordinator.java      |  85 +++--
 .../appenderator/BaseAppenderatorDriver.java       |   2 +-
 .../realtime/appenderator/SegmentAllocator.java    |   2 +-
 .../appenderator/StreamAppenderatorDriver.java     |   6 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 407 ++++++++++++++++++++-
 5 files changed, 476 insertions(+), 26 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 4887c90..c5081f9 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -253,13 +253,13 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return numSegmentsMarkedUnused;
   }
 
-  private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
+  private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
       final Interval interval
   ) throws IOException
   {
-    final List<SegmentIdWithShardSpec> identifiers = new ArrayList<>();
+    final Set<SegmentIdWithShardSpec> identifiers = new HashSet<>();
 
     final ResultIterator<byte[]> dbSegments =
         handle.createQuery(
@@ -843,15 +843,30 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           .execute();
   }
 
+  /**
+   * This function creates a new segment for the given 
datasource/interval/etc. A critical
+   * aspect of the creation is to make sure that the new version & new 
partition number will make
+   * sense given the existing segments & pending segments also very important 
is to avoid
+   * clashes with existing pending & used/unused segments.
+   * @param handle Database handle
+   * @param dataSource datasource for the new segment
+   * @param interval interval for the new segment
+   * @param partialShardSpec Shard spec info minus segment id stuff
+   * @param existingVersion Version of segments in interval, used to compute 
the version of the very first segment in
+   *                        interval
+   * @return
+   * @throws IOException
+   */
   @Nullable
   private SegmentIdWithShardSpec createNewSegment(
       final Handle handle,
       final String dataSource,
       final Interval interval,
       final PartialShardSpec partialShardSpec,
-      final String maxVersion
+      final String existingVersion
   ) throws IOException
   {
+    // Get the time chunk and associated data segments for the given interval, 
if any
     final List<TimelineObjectHolder<String, DataSegment>> existingChunks = 
getTimelineForIntervalsWithHandle(
         handle,
         dataSource,
@@ -884,66 +899,94 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             // See PartitionIds.
             .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
           // Don't use the stream API for performance.
+          // Note that this will compute the max id of existing, visible, data 
segments in the time chunk:
           if (maxId == null || maxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
             maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
           }
         }
       }
 
-      final List<SegmentIdWithShardSpec> pendings = 
getPendingSegmentsForIntervalWithHandle(
+      // Get the version of the existing chunk, we might need it in some of 
the cases below
+      // to compute the new identifier's version
+      @Nullable
+      final String versionOfExistingChunk;
+      if (!existingChunks.isEmpty()) {
+        // remember only one chunk possible for given interval so get the 
first & only one
+        versionOfExistingChunk = existingChunks.get(0).getVersion();
+      } else {
+        versionOfExistingChunk = null;
+      }
+
+      // next, we need to enrich the maxId computed before with the 
information of the pending segments
+      // it is possible that a pending segment has a higher id in which case 
we need that, it will work,
+      // and it will avoid clashes when inserting the new pending segment 
later in the caller of this method
+      final Set<SegmentIdWithShardSpec> pendings = 
getPendingSegmentsForIntervalWithHandle(
           handle,
           dataSource,
           interval
       );
-
+      // Make sure we add the maxId we obtained from the segments table:
       if (maxId != null) {
         pendings.add(maxId);
       }
-
+      //  Now compute the maxId with all the information: pendings + segments:
+      // The versionOfExistingChunks filter is ensure that we pick the max id 
with the version of the existing chunk
+      // in the case that there may be a pending segment with a higher version 
but no corresponding used segments
+      // which may generate a clash with an existing segment once the new id 
is generated
       maxId = pendings.stream()
                       .filter(id -> 
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+                      .filter(id -> versionOfExistingChunk == null ? true : 
id.getVersion().equals(versionOfExistingChunk))
                       .max((id1, id2) -> {
                         final int versionCompare = 
id1.getVersion().compareTo(id2.getVersion());
                         if (versionCompare != 0) {
                           return versionCompare;
                         } else {
-                          return 
Integer.compare(id1.getShardSpec().getPartitionNum(), 
id2.getShardSpec().getPartitionNum());
+                          return Integer.compare(
+                              id1.getShardSpec().getPartitionNum(),
+                              id2.getShardSpec().getPartitionNum()
+                          );
                         }
                       })
                       .orElse(null);
 
-      // Find the major version of existing segments
-      @Nullable final String versionOfExistingChunks;
-      if (!existingChunks.isEmpty()) {
-        versionOfExistingChunks = existingChunks.get(0).getVersion();
-      } else if (!pendings.isEmpty()) {
-        versionOfExistingChunks = pendings.get(0).getVersion();
+      // The following code attempts to compute the new version, if this
+      // new version is not null at the end of next block then it will be
+      // used as the new version in the case for initial or appended segment
+      final String newSegmentVersion;
+      if (versionOfExistingChunk != null) {
+        // segment version overrides, so pick that now that we know it exists
+        newSegmentVersion = versionOfExistingChunk;
+      } else if (!pendings.isEmpty() && maxId != null) {
+        // there is no visible segments in the time chunk, so pick the maxId 
of pendings, as computed above
+        newSegmentVersion = maxId.getVersion();
       } else {
-        versionOfExistingChunks = null;
+        // no segments, no pendings, so this must be the very first segment 
created for this interval
+        newSegmentVersion = null;
       }
 
       if (maxId == null) {
+        // When appending segments, null maxId means that we are allocating 
the very initial
+        // segment for this time chunk.
         // This code is executed when the Overlord coordinates segment 
allocation, which is either you append segments
-        // or you use segment lock. When appending segments, null maxId means 
that we are allocating the very initial
-        // segment for this time chunk. Since the core partitions set is not 
determined for appended segments, we set
+        // or you use segment lock. Since the core partitions set is not 
determined for appended segments, we set
         // it 0. When you use segment lock, the core partitions set doesn't 
work with it. We simply set it 0 so that the
         // OvershadowableManager handles the atomic segment update.
         final int newPartitionId = 
partialShardSpec.useNonRootGenerationPartitionSpace()
                                    ? 
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
                                    : PartitionIds.ROOT_GEN_START_PARTITION_ID;
-        String version = versionOfExistingChunks == null ? maxVersion : 
versionOfExistingChunks;
+        String version = newSegmentVersion == null ? existingVersion : 
newSegmentVersion;
         return new SegmentIdWithShardSpec(
             dataSource,
             interval,
             version,
             partialShardSpec.complete(jsonMapper, newPartitionId, 0)
         );
-      } else if (!maxId.getInterval().equals(interval) || 
maxId.getVersion().compareTo(maxVersion) > 0) {
+      } else if (!maxId.getInterval().equals(interval) || 
maxId.getVersion().compareTo(existingVersion) > 0) {
         log.warn(
-            "Cannot allocate new segment for dataSource[%s], interval[%s], 
maxVersion[%s]: conflicting segment[%s].",
+            "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
             dataSource,
             interval,
-            maxVersion,
+            existingVersion,
             maxId
         );
         return null;
@@ -958,7 +1001,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         return new SegmentIdWithShardSpec(
             dataSource,
             maxId.getInterval(),
-            Preconditions.checkNotNull(versionOfExistingChunks, 
"versionOfExistingChunks"),
+            Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
             partialShardSpec.complete(
                 jsonMapper,
                 maxId.getShardSpec().getPartitionNum() + 1,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index bdd572c..9deb657 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -383,7 +383,7 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
    * @param sequenceName             sequenceName for this row's segment
    * @param committerSupplier        supplier of a committer associated with 
all data that has been added, including this row
    *                                 if {@param allowIncrementalPersists} is 
set to false then this will not be used
-   * @param skipSegmentLineageCheck  if true, perform lineage validation using 
previousSegmentId for this sequence.
+   * @param skipSegmentLineageCheck  if false, perform lineage validation 
using previousSegmentId for this sequence.
    *                                 Should be set to false if replica tasks 
would index events in same order
    * @param allowIncrementalPersists whether to allow persist to happen when 
maxRowsInMemory or intermediate persist period
    *                                 threshold is hit
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
index 1bffd81..644efa5 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java
@@ -37,7 +37,7 @@ public interface SegmentAllocator
    *                                When skipSegmentLineageCheck is false, 
this can be null if it is the first call
    *                                for the same sequenceName.
    *                                When skipSegmentLineageCheck is true, this 
will be ignored.
-   * @param skipSegmentLineageCheck if true, perform lineage validation using 
previousSegmentId for this sequence.
+   * @param skipSegmentLineageCheck if false, perform lineage validation using 
previousSegmentId for this sequence.
    *                                Should be set to false if replica tasks 
would index events in same order
    *
    * @return the pending segment identifier, or null if it was impossible to 
allocate a new segment
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 53212e2..e4a84cb 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -167,8 +167,10 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
    * @param sequenceName             sequenceName for this row's segment
    * @param committerSupplier        supplier of a committer associated with 
all data that has been added, including this row
    *                                 if {@param allowIncrementalPersists} is 
set to false then this will not be used
-   * @param skipSegmentLineageCheck  if true, perform lineage validation using 
previousSegmentId for this sequence.
-   *                                 Should be set to false if replica tasks 
would index events in same order
+   * @param skipSegmentLineageCheck  Should be set {@code false} to perform 
lineage validation using previousSegmentId for this sequence.
+   *                                 Note that for Kafka Streams we should 
disable this check and set this parameter to
+   *                                 {@code true}.
+   *                                 if {@code true}, skips, does not enforce, 
lineage validation.
    * @param allowIncrementalPersists whether to allow persist to happen when 
maxRowsInMemory or intermediate persist period
    *                                 threshold is hit
    *
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 135c893..3f9431c 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -275,7 +275,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
 
   private void markAllSegmentsUnused()
   {
-    for (final DataSegment segment : SEGMENTS) {
+    markAllSegmentsUnused(SEGMENTS);
+  }
+
+  private void markAllSegmentsUnused(Set<DataSegment> segments)
+  {
+    for (final DataSegment segment : segments) {
       Assert.assertEquals(
           1,
           (int) derbyConnector.getDBI().<Integer>withHandle(
@@ -296,6 +301,45 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     }
   }
 
+  private void markAllSegmentsUsed(Set<DataSegment> segments)
+  {
+    for (final DataSegment segment : segments) {
+      Assert.assertEquals(
+          1,
+          (int) derbyConnector.getDBI().<Integer>withHandle(
+              new HandleCallback<Integer>()
+              {
+                @Override
+                public Integer withHandle(Handle handle)
+                {
+                  String request = StringUtils.format(
+                      "UPDATE %s SET used = true WHERE id = :id",
+                      
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
+                  );
+                  return handle.createStatement(request).bind("id", 
segment.getId().toString()).execute();
+                }
+              }
+          )
+      );
+    }
+  }
+
+  private List<String> retrievePendingSegmentIds()
+  {
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
+    return derbyConnector.retryWithHandle(
+        new HandleCallback<List<String>>()
+        {
+          @Override
+          public List<String> withHandle(Handle handle)
+          {
+            return handle.createQuery("SELECT id FROM " + table + "  ORDER BY 
id")
+                         .map(StringMapper.FIRST)
+                         .list();
+          }
+        }
+    );
+  }
   private List<String> retrieveUsedSegmentIds()
   {
     final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
@@ -313,6 +357,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  private List<String> retrieveUnusedSegmentIds()
+  {
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    return derbyConnector.retryWithHandle(
+        new HandleCallback<List<String>>()
+        {
+          @Override
+          public List<String> withHandle(Handle handle)
+          {
+            return handle.createQuery("SELECT id FROM " + table + " WHERE used 
= false ORDER BY id")
+                         .map(StringMapper.FIRST)
+                         .list();
+          }
+        }
+    );
+  }
+
+
   private Boolean insertUsedSegments(Set<DataSegment> dataSegments)
   {
     final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
@@ -1203,6 +1265,349 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3",
 identifier4.toString());
   }
 
+  /**
+   * This test simulates an issue detected on the field consisting of the 
following sequence of events:
+   * - A kafka stream segment was created on a given interval
+   * - Later, after the above was published, another segment on same interval 
was created by the stream
+   * - Later, after the above was published, another segment on same interval 
was created by the stream
+   * - Later a compaction was issued for the three segments above
+   * - Later, after the above was published, another segment on same interval 
was created by the stream
+   * - Later, the compacted segment got dropped due to a drop rule
+   * - Later, after the above was dropped, another segment on same interval 
was created by the stream but this
+   *   time there was an integrity violation in the pending segments table 
because the
+   *   {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, 
String, Interval, PartialShardSpec, String)}
+   *   method returned an segment id that already existed in the pending 
segments table
+   */
+  @Test
+  public void testAllocatePendingSegmentAfterDroppingExistingSegment()
+  {
+    String maxVersion = "version_newer_newer";
+
+    // simulate one load using kafka streaming
+    final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
+    final String dataSource = "ds";
+    final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq",
+        null,
+        interval,
+        partialShardSpec,
+        "version",
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version",
 identifier.toString());
+
+    // simulate one more load using kafka streaming (as if previous segment 
was published, note different sequence name)
+    final SegmentIdWithShardSpec identifier1 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq2",
+        identifier.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1",
 identifier1.toString());
+
+    // simulate one more load using kafka streaming (as if previous segment 
was published, note different sequence name)
+    final SegmentIdWithShardSpec identifier2 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq3",
+        identifier1.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
 identifier2.toString());
+
+    // now simulate that one compaction was done (batch) ingestion for same 
interval (like reindex of the previous three):
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "version_new",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    List<String> ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new",
 ids.get(0));
+
+    // one more load on same interval:
+    final SegmentIdWithShardSpec identifier3 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq4",
+        identifier1.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1",
 identifier3.toString());
+
+    // now drop the used segment previously loaded:
+    markAllSegmentsUnused(ImmutableSet.of(segment));
+
+    // and final load, this reproduces an issue that could happen with 
multiple streaming appends,
+    // followed by a reindex, followed by a drop, and more streaming data 
coming in for same interval
+    final SegmentIdWithShardSpec identifier4 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq5",
+        identifier1.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2",
 identifier4.toString());
+
+  }
+
+  /**
+   * Slightly different that the above test but that involves reverted 
compaction
+   1) used segments of version = A, id = 0, 1, 2
+   2) overwrote segments of version = B, id = 0 <= compaction
+   3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
+   4) pending segment of version = B, id = 1 <= appending new data, aborted
+   5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, 
and mark compacted segments unused
+   6) used segments of version = A, id = 0, 1, 2
+   7) pending segment of version = B, id = 1
+   */
+  @Test
+  public void testAnotherAllocatePendingSegmentAfterRevertingCompaction()
+  {
+    String maxVersion = "Z";
+
+    // 1.0) simulate one append load
+    final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
+    final String dataSource = "ds";
+    final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq",
+        null,
+        interval,
+        partialShardSpec,
+        "A",
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", 
identifier.toString());
+    // Assume it publishes; create its corresponding segment
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "A",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    List<String> ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", 
ids.get(0));
+
+
+    // 1.1) simulate one more append load  (as if previous segment was 
published, note different sequence name)
+    final SegmentIdWithShardSpec identifier1 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq2",
+        identifier.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", 
identifier1.toString());
+    // Assume it publishes; create its corresponding segment
+    segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "A",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(1),
+        9,
+        100
+    );
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", 
ids.get(1));
+
+
+    // 1.2) simulate one more append load  (as if previous segment was 
published, note different sequence name)
+    final SegmentIdWithShardSpec identifier2 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq3",
+        identifier1.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", 
identifier2.toString());
+    // Assume it publishes; create its corresponding segment
+    segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "A",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(2),
+        9,
+        100
+    );
+    // state so far:
+    // pendings: A: 0,1,2
+    // used segments A: 0,1,2
+    // unused segments:
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", 
ids.get(2));
+
+
+    // 2)
+    // now simulate that one compaction was done (batch) ingestion for same 
interval (like reindex of the previous three):
+    DataSegment compactedSegment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "B",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(compactedSegment)));
+    ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B", 
ids.get(3));
+    // 3) When overshadowing, segments are still marked as "used" in the 
segments table
+    // state so far:
+    // pendings: A: 0,1,2
+    // used segments: A: 0,1,2; B: 0 <- new compacted segment, overshadows 
previous version A
+    // unused segment:
+
+    // 4) pending segment of version = B, id = 1 <= appending new data, aborted
+    final SegmentIdWithShardSpec identifier3 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq4",
+        identifier2.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", 
identifier3.toString());
+    // no corresponding segment, pending aborted
+    // state so far:
+    // pendings: A: 0,1,2; B:1 (note that B_1 does not make it into segments 
since its task aborted)
+    // used segments: A: 0,1,2; B: 0 <-  compacted segment, overshadows 
previous version A
+    // unused segment:
+
+    // 5) reverted compaction (by marking B_0 as unused)
+    // Revert compaction a manual metadata update which is basically the 
following two steps:
+    markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop 
compacted segment
+    //        pending: version = A, id = 0,1,2
+    //                 version = B, id = 1
+    //
+    //        used segment: version = A, id = 0,1,2
+    //        unused segment: version = B, id = 0
+    List<String> pendings = retrievePendingSegmentIds();
+    Assert.assertTrue(pendings.size() == 4);
+
+    List<String> used = retrieveUsedSegmentIds();
+    Assert.assertTrue(used.size() == 3);
+
+    List<String> unused = retrieveUnusedSegmentIds();
+    Assert.assertTrue(unused.size() == 1);
+
+    // Simulate one more append load
+    final SegmentIdWithShardSpec identifier4 = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq5",
+        identifier1.toString(),
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    // maxid = B_1 -> new partno = 2
+    // versionofexistingchunk=A
+    // ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", 
identifier4.toString());
+    // Assume it publishes; create its corresponding segment
+    segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "A",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(3),
+        9,
+        100
+    );
+    //        pending: version = A, id = 0,1,2,3
+    //                 version = B, id = 1
+    //
+    //        used segment: version = A, id = 0,1,2,3
+    //        unused segment: version = B, id = 0
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", 
ids.get(3));
+
+  }
+  
+  @Test
+  public void testNoPendingSegmentsAndOneUsedSegment()
+  {
+    String maxVersion = "Z";
+
+    // create one used segment
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
+        "A",
+        ImmutableMap.of(),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new LinearShardSpec(0),
+        9,
+        100
+    );
+    Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
+    List<String> ids = retrieveUsedSegmentIds();
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", 
ids.get(0));
+
+
+    // simulate one aborted append load
+    final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
+    final String dataSource = "ds";
+    final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+        dataSource,
+        "seq",
+        null,
+        interval,
+        partialShardSpec,
+        maxVersion,
+        true
+    );
+    
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", 
identifier.toString());
+    
+  }
+
+
+
   @Test
   public void testDeletePendingSegment() throws InterruptedException
   {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to