This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new aea64989907 Add test for atomicity of commitSegmentsAndMetadata
(#17964)
aea64989907 is described below
commit aea6498990775994860ef78987bf2c438c5477cd
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 1 09:40:21 2025 +0530
Add test for atomicity of commitSegmentsAndMetadata (#17964)
Changes:
- Add unit test to verify atomicity of
`IndexerMetadataStorageCoordinator.commitSegmentsAndMetadata()`
- Improve logs
- Minor style refactor
---
.../IndexerSQLMetadataStorageCoordinator.java | 133 +++++++++++----------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 70 ++++++++++-
...ataStorageCoordinatorSchemaPersistenceTest.java | 4 +-
3 files changed, 136 insertions(+), 71 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 8f448168359..c93ca2516bd 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
@@ -419,7 +418,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
transaction -> {
// Try to update datasource metadata first
if (startMetadata != null) {
- final SegmentPublishResult result =
updateDataSourceMetadataWithHandle(
+ final SegmentPublishResult result =
updateDataSourceMetadataInTransaction(
transaction,
dataSource,
startMetadata,
@@ -432,13 +431,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- final Set<DataSegment> inserted =
- insertSegments(
- transaction,
- segments,
- segmentSchemaMapping
- );
- return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
+ return SegmentPublishResult.ok(
+ Set.copyOf(insertSegments(transaction, segments,
segmentSchemaMapping))
+ );
}
);
}
@@ -563,7 +558,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
try {
return inReadWriteDatasourceTransaction(
dataSource,
- transaction -> updateDataSourceMetadataWithHandle(
+ transaction -> updateDataSourceMetadataInTransaction(
transaction,
dataSource,
startMetadata,
@@ -1199,7 +1194,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// Try to update datasource metadata first
if (startMetadata != null) {
final SegmentPublishResult metadataUpdateResult
- = updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
+ = updateDataSourceMetadataInTransaction(transaction,
dataSource, startMetadata, endMetadata);
// Abort the transaction if datasource metadata update has failed
if (!metadataUpdateResult.isSuccess()) {
@@ -1659,7 +1654,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
- private Set<DataSegment> insertSegments(
+ protected Set<DataSegment> insertSegments(
final SegmentMetadataTransaction transaction,
final Set<DataSegment> segments,
@Nullable final SegmentSchemaMapping segmentSchemaMapping
@@ -2088,7 +2083,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
/**
* Read dataSource metadata as bytes, from a specific handle. Returns null
if there is no metadata.
*/
- private @Nullable byte[] retrieveDataSourceMetadataWithHandleAsBytes(
+ private @Nullable byte[] retrieveDataSourceMetadataAsBytes(
final SegmentMetadataTransaction transaction,
final String dataSource
)
@@ -2103,22 +2098,25 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
/**
- * Compare-and-swap dataSource metadata in a transaction. This will only
modify dataSource metadata if it equals
- * oldCommitMetadata when this function is called (based on T.equals). This
method is idempotent in that if
- * the metadata already equals newCommitMetadata, it will return true.
- *
- * @param startMetadata dataSource metadata pre-insert must match this
startMetadata according to
- * {@link
DataSourceMetadata#matches(DataSourceMetadata)}
- * @param endMetadata dataSource metadata post-insert will have this
endMetadata merged in with
- * {@link DataSourceMetadata#plus(DataSourceMetadata)}
+ * Compare-and-swap {@link DataSourceMetadata} for a datasource in a
transaction.
+ * This method updates the metadata for the given datasource only if the
+ * currently persisted entry {@link DataSourceMetadata#matches matches} the
+ * {@code startMetadata}. If the current entry in the DB
+ * {@link DataSourceMetadata#matches matches} the {@code endMetadata}, this
+ * method returns immediately with success.
*
- * @return SUCCESS if dataSource metadata was updated from matching
startMetadata to matching endMetadata, FAILURE or
- * TRY_AGAIN if it definitely was not updated. This guarantee is meant to
help
- * {@link #commitSegmentsAndMetadata} achieve its own guarantee.
- *
- * @throws RuntimeException if state is unknown after this call
+ * @param startMetadata Current entry in the DB must
+ * {@link DataSourceMetadata#matches match} this value.
+ * @param endMetadata The updated entry will be equal to the current entry
+ * {@link DataSourceMetadata#plus(DataSourceMetadata)
plus}
+ * this value.
+ * @return Successful {@link SegmentPublishResult} if the metadata in the DB
+ * was successfully updated, or if the current entry already matches
endMetadata.
+ * Otherwise, returns a failed {@link SegmentPublishResult} (retryable or
not).
+ * @throws IOException if an error occurred while serializing or
deserializing.
+ * @throws NullPointerException if any of the arguments is null.
*/
- protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
final SegmentMetadataTransaction transaction,
final String dataSource,
final DataSourceMetadata startMetadata,
@@ -2129,7 +2127,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Preconditions.checkNotNull(startMetadata, "startMetadata");
Preconditions.checkNotNull(endMetadata, "endMetadata");
- final byte[] oldCommitMetadataBytesFromDb =
retrieveDataSourceMetadataWithHandleAsBytes(transaction, dataSource);
+ final byte[] oldCommitMetadataBytesFromDb =
+ retrieveDataSourceMetadataAsBytes(transaction, dataSource);
final String oldCommitMetadataSha1FromDb;
final DataSourceMetadata oldCommitMetadataFromDb;
@@ -2194,54 +2193,58 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
- final SegmentPublishResult retVal;
+ final SegmentPublishResult publishResult;
if (oldCommitMetadataBytesFromDb == null) {
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
- final int numRows = transaction.getHandle().createStatement(
- StringUtils.format(
- "INSERT INTO %s (dataSource, created_date,
commit_metadata_payload, commit_metadata_sha1) "
- + "VALUES (:dataSource, :created_date, :commit_metadata_payload,
:commit_metadata_sha1)",
- dbTables.getDataSourceTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("created_date",
DateTimes.nowUtc().toString())
- .bind("commit_metadata_payload",
newCommitMetadataBytes)
- .bind("commit_metadata_sha1",
newCommitMetadataSha1)
- .execute();
-
- retVal = numRows == 1
+ final String insertSql = StringUtils.format(
+ "INSERT INTO %s (dataSource, created_date, commit_metadata_payload,
commit_metadata_sha1) "
+ + "VALUES (:dataSource, :created_date, :commit_metadata_payload,
:commit_metadata_sha1)",
+ dbTables.getDataSourceTable()
+ );
+ final int numRows = transaction.getHandle().createStatement(insertSql)
+ .bind("dataSource", dataSource)
+ .bind("created_date",
DateTimes.nowUtc().toString())
+ .bind("commit_metadata_payload",
newCommitMetadataBytes)
+ .bind("commit_metadata_sha1",
newCommitMetadataSha1)
+ .execute();
+
+ publishResult = numRows == 1
? SegmentPublishResult.ok(Set.of())
- : SegmentPublishResult.retryableFailure("Failed to insert metadata
for datasource[%s]", dataSource);
+ : SegmentPublishResult.retryableFailure("Insert failed");
} else {
// Expecting a particular old metadata; use the SHA1 in a
compare-and-swap UPDATE
- final int numRows = transaction.getHandle().createStatement(
- StringUtils.format(
- "UPDATE %s SET "
- + "commit_metadata_payload = :new_commit_metadata_payload, "
- + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
- + "WHERE dataSource = :dataSource AND commit_metadata_sha1 =
:old_commit_metadata_sha1",
- dbTables.getDataSourceTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("old_commit_metadata_sha1",
oldCommitMetadataSha1FromDb)
- .bind("new_commit_metadata_payload",
newCommitMetadataBytes)
- .bind("new_commit_metadata_sha1",
newCommitMetadataSha1)
- .execute();
-
- retVal = numRows == 1
+ final String updateSql = StringUtils.format(
+ "UPDATE %s SET "
+ + "commit_metadata_payload = :new_commit_metadata_payload, "
+ + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ + "WHERE dataSource = :dataSource AND commit_metadata_sha1 =
:old_commit_metadata_sha1",
+ dbTables.getDataSourceTable()
+ );
+ final int numRows = transaction.getHandle().createStatement(updateSql)
+ .bind("dataSource", dataSource)
+ .bind("old_commit_metadata_sha1",
oldCommitMetadataSha1FromDb)
+ .bind("new_commit_metadata_payload",
newCommitMetadataBytes)
+ .bind("new_commit_metadata_sha1",
newCommitMetadataSha1)
+ .execute();
+
+ publishResult = numRows == 1
? SegmentPublishResult.ok(Set.of())
- : SegmentPublishResult.retryableFailure("Failed to update metadata
for datasource[%s]", dataSource);
+ : SegmentPublishResult.retryableFailure("Compare-and-swap update
failed");
}
- if (retVal.isSuccess()) {
- log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb,
newCommitMetadata);
+ if (publishResult.isSuccess()) {
+ log.info(
+ "Updated metadata for datasource[%s] from[%s] to[%s].",
+ dataSource, oldCommitMetadataFromDb, newCommitMetadata
+ );
} else {
- log.info("Not updating metadata, compare-and-swap failure.");
+ log.info(
+ "Failed to update metadata for datasource[%s] due to reason[%s].",
+ dataSource, publishResult.getErrorMsg()
+ );
}
- return retVal;
+ return publishResult;
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b902b1be608..f7c5af17ecf 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -68,6 +69,7 @@ import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@@ -90,6 +92,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -195,7 +198,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
)
{
@Override
- protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
@@ -204,7 +207,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
{
// Count number of times this method is called.
metadataUpdateCounter.getAndIncrement();
- return super.updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
+ return super.updateDataSourceMetadataInTransaction(transaction,
dataSource, startMetadata, endMetadata);
}
};
}
@@ -788,7 +791,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
)
{
@Override
- protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
@@ -799,7 +802,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
if (attemptCounter.getAndIncrement() == 0) {
return SegmentPublishResult.retryableFailure("this failure can be
retried");
} else {
- return super.updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
+ return super.updateDataSourceMetadataInTransaction(transaction,
dataSource, startMetadata, endMetadata);
}
}
};
@@ -921,6 +924,65 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(2, metadataUpdateCounter.get());
}
+ @Test
+ public void test_commitSegmentsAndMetadata_isAtomic()
+ {
+ final String dataSource = defaultSegment.getDataSource();
+ Assert.assertNull(coordinator.retrieveDataSourceMetadata(dataSource));
+
+ // Create an instance which fails to insert segments but updates metadata
successfully
+ final AtomicBoolean isMetadataUpdated = new AtomicBoolean(false);
+ final IndexerSQLMetadataStorageCoordinator storageCoordinator = new
IndexerSQLMetadataStorageCoordinator(
+ transactionFactory,
+ mapper,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ derbyConnector,
+ segmentSchemaManager,
+ CentralizedDatasourceSchemaConfig.create()
+ )
+ {
+ @Override
+ protected Set<DataSegment> insertSegments(
+ SegmentMetadataTransaction transaction,
+ Set<DataSegment> segments,
+ SegmentSchemaMapping segmentSchemaMapping
+ )
+ {
+ throw new RuntimeException("Fail segment insert");
+ }
+
+ @Override
+ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
+ SegmentMetadataTransaction transaction,
+ String dataSource,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ ) throws IOException
+ {
+ isMetadataUpdated.set(true);
+ return super.updateDataSourceMetadataInTransaction(transaction,
dataSource, startMetadata, endMetadata);
+ }
+ };
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ RuntimeException.class,
+ () -> storageCoordinator.commitSegmentsAndMetadata(
+ Set.of(defaultSegment),
+ new ObjectMetadata(null),
+ new ObjectMetadata(Map.of("foo", "baz")),
+ null
+ )
+ ),
+ ExceptionMatcher.of(RuntimeException.class)
+ .expectMessageIs("java.lang.RuntimeException: Fail
segment insert")
+ );
+
+ // Verify that the datasource metadata update succeeded but was rolled back
+ Assert.assertTrue(isMetadataUpdated.get());
+ Assert.assertNull(coordinator.retrieveDataSourceMetadata(dataSource));
+ }
+
@Test
public void testRetrieveUsedSegmentForId()
{
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index 75d2cd9d2b6..932b2eff087 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -112,7 +112,7 @@ public class
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
)
{
@Override
- protected SegmentPublishResult updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
@@ -121,7 +121,7 @@ public class
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
{
// Count number of times this method is called.
metadataUpdateCounter.getAndIncrement();
- return super.updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
+ return super.updateDataSourceMetadataInTransaction(transaction,
dataSource, startMetadata, endMetadata);
}
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]