This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 50d186fc5207a942417cb960506fabe2f3ab71df
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 30 15:36:21 2023 +0100

    [CEP-21] Ensure that ClusterMetadata::forceEpoch keeps component epochs 
consistent
    
    patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov
    for CASSANDRA-18460
---
 .../org/apache/cassandra/tcm/ClusterMetadata.java  | 43 ++++++++++++++++++----
 1 file changed, 36 insertions(+), 7 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index ab412cfd05..a5467e3756 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -176,17 +176,46 @@ public class ClusterMetadata
 
     public ClusterMetadata forceEpoch(Epoch epoch)
     {
+        // In certain circumstances, the last modified epoch of the individual
+        // components may have been updated beyond the epoch we're specifying 
here.
+        // An example is the execution of an UnsafeJoin transformation, where 
the
+        // sub-steps (Start/Mid/Finish) are executed in series, each updating a
+        // single ClusterMetadata and its individual components. At the end of 
that
+        // sequence, the CM epoch is then set forcibly to ensure the 
UnsafeJoin only
+        // increments the published epoch by one. As each component has its 
own last
+        // modified epoch, we may also need to coerce those, but only if they 
are
+        // greater than the epoch we're forcing here.
         return new ClusterMetadata(epoch,
                                    period,
                                    lastInPeriod,
                                    partitioner,
-                                   schema,
-                                   directory,
-                                   tokenMap,
-                                   placements,
-                                   lockedRanges,
-                                   inProgressSequences,
-                                   extensions);
+                                   capLastModified(schema, epoch),
+                                   capLastModified(directory, epoch),
+                                   capLastModified(tokenMap, epoch),
+                                   capLastModified(placements, epoch),
+                                   capLastModified(lockedRanges, epoch),
+                                   capLastModified(inProgressSequences, epoch),
+                                   capLastModified(extensions, epoch));
+    }
+
+    private static Map<ExtensionKey<?,?>, ExtensionValue<?>> 
capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch 
maxEpoch)
+    {
+        Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>();
+        original.forEach((key, value) -> {
+            ExtensionValue<?> newValue = value == null || 
value.lastModified().isEqualOrBefore(maxEpoch)
+                                         ? value
+                                         : 
(ExtensionValue<?>)value.withLastModified(maxEpoch);
+            updated.put(key, newValue);
+        });
+        return updated;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <V> V capLastModified(MetadataValue<V> value, Epoch 
maxEpoch)
+    {
+        return value == null || value.lastModified().isEqualOrBefore(maxEpoch)
+               ? (V)value
+               : value.withLastModified(maxEpoch);
     }
 
     public Epoch nextEpoch()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to