This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 50d186fc5207a942417cb960506fabe2f3ab71df Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 30 15:36:21 2023 +0100 [CEP-21] Ensure that ClusterMetadata::forceEpoch keeps component epochs consistent patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov for CASSANDRA-18460 --- .../org/apache/cassandra/tcm/ClusterMetadata.java | 43 ++++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index ab412cfd05..a5467e3756 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -176,17 +176,46 @@ public class ClusterMetadata public ClusterMetadata forceEpoch(Epoch epoch) { + // In certain circumstances, the last modified epoch of the individual + // components may have been updated beyond the epoch we're specifying here. + // An example is the execution of an UnsafeJoin transformation, where the + // sub-steps (Start/Mid/Finish) are executed in series, each updating a + // single ClusterMetadata and its individual components. At the end of that + // sequence, the CM epoch is then set forcibly to ensure the UnsafeJoin only + // increments the published epoch by one. As each component has its own last + // modified epoch, we may also need to coerce those, but only if they are + // greater than the epoch we're forcing here. return new ClusterMetadata(epoch, period, lastInPeriod, partitioner, - schema, - directory, - tokenMap, - placements, - lockedRanges, - inProgressSequences, - extensions); + capLastModified(schema, epoch), + capLastModified(directory, epoch), + capLastModified(tokenMap, epoch), + capLastModified(placements, epoch), + capLastModified(lockedRanges, epoch), + capLastModified(inProgressSequences, epoch), + capLastModified(extensions, epoch)); + } + + private static Map<ExtensionKey<?,?>, ExtensionValue<?>> capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch maxEpoch) + { + Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>(); + original.forEach((key, value) -> { + ExtensionValue<?> newValue = value == null || value.lastModified().isEqualOrBefore(maxEpoch) + ? value + : (ExtensionValue<?>)value.withLastModified(maxEpoch); + updated.put(key, newValue); + }); + return updated; + } + + @SuppressWarnings("unchecked") + private static <V> V capLastModified(MetadataValue<V> value, Epoch maxEpoch) + { + return value == null || value.lastModified().isEqualOrBefore(maxEpoch) + ? (V)value + : value.withLastModified(maxEpoch); } public Epoch nextEpoch() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
