This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aea64989907 Add test for atomicity of commitSegmentsAndMetadata 
(#17964)
aea64989907 is described below

commit aea6498990775994860ef78987bf2c438c5477cd
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 1 09:40:21 2025 +0530

    Add test for atomicity of commitSegmentsAndMetadata (#17964)
    
    Changes:
    - Add unit test to verify atomicity of 
`IndexerMetadataStorageCoordinator.commitSegmentsAndMetadata()`
    - Improve logs
    - Minor style refactor
---
 .../IndexerSQLMetadataStorageCoordinator.java      | 133 +++++++++++----------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  70 ++++++++++-
 ...ataStorageCoordinatorSchemaPersistenceTest.java |   4 +-
 3 files changed, 136 insertions(+), 71 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 8f448168359..c93ca2516bd 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
@@ -419,7 +418,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           transaction -> {
             // Try to update datasource metadata first
             if (startMetadata != null) {
-              final SegmentPublishResult result = 
updateDataSourceMetadataWithHandle(
+              final SegmentPublishResult result = 
updateDataSourceMetadataInTransaction(
                   transaction,
                   dataSource,
                   startMetadata,
@@ -432,13 +431,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               }
             }
 
-            final Set<DataSegment> inserted =
-                insertSegments(
-                    transaction,
-                    segments,
-                    segmentSchemaMapping
-                );
-            return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
+            return SegmentPublishResult.ok(
+                Set.copyOf(insertSegments(transaction, segments, 
segmentSchemaMapping))
+            );
           }
       );
     }
@@ -563,7 +558,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     try {
       return inReadWriteDatasourceTransaction(
           dataSource,
-          transaction -> updateDataSourceMetadataWithHandle(
+          transaction -> updateDataSourceMetadataInTransaction(
               transaction,
               dataSource,
               startMetadata,
@@ -1199,7 +1194,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             // Try to update datasource metadata first
             if (startMetadata != null) {
               final SegmentPublishResult metadataUpdateResult
-                  = updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
+                  = updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
 
               // Abort the transaction if datasource metadata update has failed
               if (!metadataUpdateResult.isSuccess()) {
@@ -1659,7 +1654,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
-  private Set<DataSegment> insertSegments(
+  protected Set<DataSegment> insertSegments(
       final SegmentMetadataTransaction transaction,
       final Set<DataSegment> segments,
       @Nullable final SegmentSchemaMapping segmentSchemaMapping
@@ -2088,7 +2083,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   /**
    * Read dataSource metadata as bytes, from a specific handle. Returns null 
if there is no metadata.
    */
-  private @Nullable byte[] retrieveDataSourceMetadataWithHandleAsBytes(
+  private @Nullable byte[] retrieveDataSourceMetadataAsBytes(
       final SegmentMetadataTransaction transaction,
       final String dataSource
   )
@@ -2103,22 +2098,25 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   /**
-   * Compare-and-swap dataSource metadata in a transaction. This will only 
modify dataSource metadata if it equals
-   * oldCommitMetadata when this function is called (based on T.equals). This 
method is idempotent in that if
-   * the metadata already equals newCommitMetadata, it will return true.
-   *
-   * @param startMetadata dataSource metadata pre-insert must match this 
startMetadata according to
-   *                      {@link 
DataSourceMetadata#matches(DataSourceMetadata)}
-   * @param endMetadata   dataSource metadata post-insert will have this 
endMetadata merged in with
-   *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}
+   * Compare-and-swap {@link DataSourceMetadata} for a datasource in a 
transaction.
+   * This method updates the metadata for the given datasource only if the
+   * currently persisted entry {@link DataSourceMetadata#matches matches} the
+   * {@code startMetadata}. If the current entry in the DB
+   * {@link DataSourceMetadata#matches matches} the {@code endMetadata}, this
+   * method returns immediately with success.
    *
-   * @return SUCCESS if dataSource metadata was updated from matching 
startMetadata to matching endMetadata, FAILURE or
-   * TRY_AGAIN if it definitely was not updated. This guarantee is meant to 
help
-   * {@link #commitSegmentsAndMetadata} achieve its own guarantee.
-   *
-   * @throws RuntimeException if state is unknown after this call
+   * @param startMetadata Current entry in the DB must
+   *                      {@link DataSourceMetadata#matches match} this value.
+   * @param endMetadata   The updated entry will be equal to the current entry
+   *                      {@link DataSourceMetadata#plus(DataSourceMetadata) 
plus}
+   *                      this value.
+   * @return Successful {@link SegmentPublishResult} if the metadata in the DB
+   * was successfully updated, or if the current entry already matches 
endMetadata.
+   * Otherwise, returns a failed {@link SegmentPublishResult} (retryable or 
not).
+   * @throws IOException          if an error occurred while serializing or 
deserializing.
+   * @throws NullPointerException if any of the arguments is null.
    */
-  protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+  protected SegmentPublishResult updateDataSourceMetadataInTransaction(
       final SegmentMetadataTransaction transaction,
       final String dataSource,
       final DataSourceMetadata startMetadata,
@@ -2129,7 +2127,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     Preconditions.checkNotNull(startMetadata, "startMetadata");
     Preconditions.checkNotNull(endMetadata, "endMetadata");
 
-    final byte[] oldCommitMetadataBytesFromDb = 
retrieveDataSourceMetadataWithHandleAsBytes(transaction, dataSource);
+    final byte[] oldCommitMetadataBytesFromDb =
+        retrieveDataSourceMetadataAsBytes(transaction, dataSource);
     final String oldCommitMetadataSha1FromDb;
     final DataSourceMetadata oldCommitMetadataFromDb;
 
@@ -2194,54 +2193,58 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
     );
 
-    final SegmentPublishResult retVal;
+    final SegmentPublishResult publishResult;
     if (oldCommitMetadataBytesFromDb == null) {
       // SELECT -> INSERT can fail due to races; callers must be prepared to 
retry.
-      final int numRows = transaction.getHandle().createStatement(
-          StringUtils.format(
-              "INSERT INTO %s (dataSource, created_date, 
commit_metadata_payload, commit_metadata_sha1) "
-              + "VALUES (:dataSource, :created_date, :commit_metadata_payload, 
:commit_metadata_sha1)",
-              dbTables.getDataSourceTable()
-          )
-      )
-                                .bind("dataSource", dataSource)
-                                .bind("created_date", 
DateTimes.nowUtc().toString())
-                                .bind("commit_metadata_payload", 
newCommitMetadataBytes)
-                                .bind("commit_metadata_sha1", 
newCommitMetadataSha1)
-                                .execute();
-
-      retVal = numRows == 1
+      final String insertSql = StringUtils.format(
+          "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, 
commit_metadata_sha1) "
+          + "VALUES (:dataSource, :created_date, :commit_metadata_payload, 
:commit_metadata_sha1)",
+          dbTables.getDataSourceTable()
+      );
+      final int numRows = transaction.getHandle().createStatement(insertSql)
+                                     .bind("dataSource", dataSource)
+                                     .bind("created_date", 
DateTimes.nowUtc().toString())
+                                     .bind("commit_metadata_payload", 
newCommitMetadataBytes)
+                                     .bind("commit_metadata_sha1", 
newCommitMetadataSha1)
+                                     .execute();
+
+      publishResult = numRows == 1
           ? SegmentPublishResult.ok(Set.of())
-          : SegmentPublishResult.retryableFailure("Failed to insert metadata 
for datasource[%s]", dataSource);
+          : SegmentPublishResult.retryableFailure("Insert failed");
     } else {
       // Expecting a particular old metadata; use the SHA1 in a 
compare-and-swap UPDATE
-      final int numRows = transaction.getHandle().createStatement(
-          StringUtils.format(
-              "UPDATE %s SET "
-              + "commit_metadata_payload = :new_commit_metadata_payload, "
-              + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
-              + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = 
:old_commit_metadata_sha1",
-              dbTables.getDataSourceTable()
-          )
-      )
-                                .bind("dataSource", dataSource)
-                                .bind("old_commit_metadata_sha1", 
oldCommitMetadataSha1FromDb)
-                                .bind("new_commit_metadata_payload", 
newCommitMetadataBytes)
-                                .bind("new_commit_metadata_sha1", 
newCommitMetadataSha1)
-                                .execute();
-
-      retVal = numRows == 1
+      final String updateSql = StringUtils.format(
+          "UPDATE %s SET "
+          + "commit_metadata_payload = :new_commit_metadata_payload, "
+          + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+          + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = 
:old_commit_metadata_sha1",
+          dbTables.getDataSourceTable()
+      );
+      final int numRows = transaction.getHandle().createStatement(updateSql)
+                                     .bind("dataSource", dataSource)
+                                     .bind("old_commit_metadata_sha1", 
oldCommitMetadataSha1FromDb)
+                                     .bind("new_commit_metadata_payload", 
newCommitMetadataBytes)
+                                     .bind("new_commit_metadata_sha1", 
newCommitMetadataSha1)
+                                     .execute();
+
+      publishResult = numRows == 1
           ? SegmentPublishResult.ok(Set.of())
-          : SegmentPublishResult.retryableFailure("Failed to update metadata 
for datasource[%s]", dataSource);
+          : SegmentPublishResult.retryableFailure("Compare-and-swap update 
failed");
     }
 
-    if (retVal.isSuccess()) {
-      log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, 
newCommitMetadata);
+    if (publishResult.isSuccess()) {
+      log.info(
+          "Updated metadata for datasource[%s] from[%s] to[%s].",
+          dataSource, oldCommitMetadataFromDb, newCommitMetadata
+      );
     } else {
-      log.info("Not updating metadata, compare-and-swap failure.");
+      log.info(
+          "Failed to update metadata for datasource[%s] due to reason[%s].",
+          dataSource, publishResult.getErrorMsg()
+      );
     }
 
-    return retVal;
+    return publishResult;
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b902b1be608..f7c5af17ecf 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.error.ExceptionMatcher;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -68,6 +69,7 @@ import org.apache.druid.timeline.partition.PartitionIds;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.apache.druid.timeline.partition.TombstoneShardSpec;
 import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.After;
@@ -90,6 +92,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -195,7 +198,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     )
     {
       @Override
-      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataInTransaction(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
@@ -204,7 +207,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       {
         // Count number of times this method is called.
         metadataUpdateCounter.getAndIncrement();
-        return super.updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
+        return super.updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
       }
     };
   }
@@ -788,7 +791,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     )
     {
       @Override
-      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataInTransaction(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
@@ -799,7 +802,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         if (attemptCounter.getAndIncrement() == 0) {
           return SegmentPublishResult.retryableFailure("this failure can be 
retried");
         } else {
-          return super.updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
+          return super.updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
         }
       }
     };
@@ -921,6 +924,65 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(2, metadataUpdateCounter.get());
   }
 
+  @Test
+  public void test_commitSegmentsAndMetadata_isAtomic()
+  {
+    final String dataSource = defaultSegment.getDataSource();
+    Assert.assertNull(coordinator.retrieveDataSourceMetadata(dataSource));
+
+    // Create an instance which fails to insert segments but updates metadata 
successfully
+    final AtomicBoolean isMetadataUpdated = new AtomicBoolean(false);
+    final IndexerSQLMetadataStorageCoordinator storageCoordinator = new 
IndexerSQLMetadataStorageCoordinator(
+        transactionFactory,
+        mapper,
+        derbyConnectorRule.metadataTablesConfigSupplier().get(),
+        derbyConnector,
+        segmentSchemaManager,
+        CentralizedDatasourceSchemaConfig.create()
+    )
+    {
+      @Override
+      protected Set<DataSegment> insertSegments(
+          SegmentMetadataTransaction transaction,
+          Set<DataSegment> segments,
+          SegmentSchemaMapping segmentSchemaMapping
+      )
+      {
+        throw new RuntimeException("Fail segment insert");
+      }
+
+      @Override
+      protected SegmentPublishResult updateDataSourceMetadataInTransaction(
+          SegmentMetadataTransaction transaction,
+          String dataSource,
+          DataSourceMetadata startMetadata,
+          DataSourceMetadata endMetadata
+      ) throws IOException
+      {
+        isMetadataUpdated.set(true);
+        return super.updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
+      }
+    };
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            RuntimeException.class,
+            () -> storageCoordinator.commitSegmentsAndMetadata(
+                Set.of(defaultSegment),
+                new ObjectMetadata(null),
+                new ObjectMetadata(Map.of("foo", "baz")),
+                null
+            )
+        ),
+        ExceptionMatcher.of(RuntimeException.class)
+                        .expectMessageIs("java.lang.RuntimeException: Fail 
segment insert")
+    );
+
+    // Verify that the datasource metadata update succeeded but was rolled back
+    Assert.assertTrue(isMetadataUpdated.get());
+    Assert.assertNull(coordinator.retrieveDataSourceMetadata(dataSource));
+  }
+
   @Test
   public void testRetrieveUsedSegmentForId()
   {
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index 75d2cd9d2b6..932b2eff087 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -112,7 +112,7 @@ public class 
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
     )
     {
       @Override
-      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataInTransaction(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
@@ -121,7 +121,7 @@ public class 
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
       {
         // Count number of times this method is called.
         metadataUpdateCounter.getAndIncrement();
-        return super.updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
+        return super.updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
       }
     };
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to