This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 598a921  Use isTransient=false for ZCS sstables
598a921 is described below

commit 598a92180e2ad95b48419605d270c53497739f35
Author: Zhao Yang <[email protected]>
AuthorDate: Mon May 11 04:29:47 2020 +0800

    Use isTransient=false for ZCS sstables
    
     patch by Zhao Yang; reviewed by Blake Eggleston, Dinesh Joshi, Ekaterina 
Dimitrova for CASSANDRA-15783
---
 CHANGES.txt                                        |  1 +
 doc/source/new/transientreplication.rst            |  4 ++--
 .../CassandraEntireSSTableStreamReader.java        |  9 ++++++--
 .../io/sstable/metadata/IMetadataSerializer.java   | 10 +++++++++
 .../io/sstable/metadata/MetadataSerializer.java    | 25 +++++++++++++---------
 5 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 01f74f0..1be10dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Speed up entire-file-streaming file containment check and allow 
entire-file-streaming for all compaction strategies 
(CASSANDRA-15657,CASSANDRA-15783)
  * Provide ability to configure IAuditLogger (CASSANDRA-15748)
  * Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819)
  * Add isTransient to SSTableMetadataView (CASSANDRA-15806)
diff --git a/doc/source/new/transientreplication.rst 
b/doc/source/new/transientreplication.rst
index 438f437..aa39a11 100644
--- a/doc/source/new/transientreplication.rst
+++ b/doc/source/new/transientreplication.rst
@@ -33,7 +33,7 @@ Certain nodes act as full replicas (storing all the data for 
a given token range
 
 The optimization that is made possible with transient replication is called 
"Cheap quorums", which implies that data redundancy is increased without 
corresponding increase in storage usage.
 
-Transient replication is useful when sufficient full replicas are unavailable 
to receive and store all the data.
+Transient replication is useful when sufficient full replicas are available to 
receive and store all the data.
 Transient replication allows you to configure a subset of replicas to only 
replicate data that hasn't been incrementally repaired.
 As an optimization, we can avoid writing data to a transient replica if we 
have successfully written data to the full replicas.
 
@@ -55,7 +55,7 @@ As an example, create a keyspace with replication factor (RF) 
3.
 ::
 
  CREATE KEYSPACE CassandraKeyspaceSimple WITH replication = {'class': 
'SimpleStrategy',
- 'replication_factor' : 4/1};
+ 'replication_factor' : 3/1};
 
 
 As another example, ``some_keysopace keyspace`` will have 3 replicas in DC1, 1 
of which is transient, and 5 replicas in DC2, 2 of which are transient:
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 479ee71..eac37d1 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 
+import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
@@ -29,12 +30,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.ProgressInfo;
@@ -54,6 +55,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
 
     private final TableId tableId;
     private final StreamSession session;
+    private final StreamMessageHeader messageHeader;
     private final CassandraStreamHeader header;
     private final int fileSequenceNumber;
 
@@ -71,6 +73,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
 
         this.header = streamHeader;
         this.session = session;
+        this.messageHeader = messageHeader;
         this.tableId = messageHeader.tableId;
         this.fileSequenceNumber = messageHeader.sequenceNumber;
     }
@@ -132,7 +135,9 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                              prettyPrintMemory(totalSize));
             }
 
-            
writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, 
header.sstableLevel);
+            Function<StatsMetadata, StatsMetadata> transform = stats -> 
stats.mutateLevel(header.sstableLevel)
+                                                                             
.mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, 
false);
+            
writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, transform);
             return writer;
         }
         catch (Throwable e)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index c842d02..db9f161 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -62,6 +63,15 @@ public interface IMetadataSerializer
     MetadataComponent deserialize(Descriptor descriptor, MetadataType type) 
throws IOException;
 
     /**
+     * Mutate SSTable Metadata
+     *
+     * @param descriptor SSTable descriptor
+     * @param transform function to mutate sstable metadata
+     * @throws IOException
+     */
+    public void mutate(Descriptor descriptor, Function<StatsMetadata, 
StatsMetadata> transform) throws IOException;
+
+    /**
      * Mutate SSTable level
      *
      * @param descriptor SSTable descriptor
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 9cb9a20..0ae7c32 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.*;
 import java.util.*;
+import java.util.function.Function;
 import java.util.zip.CRC32;
 
 import com.google.common.collect.Lists;
@@ -219,27 +220,31 @@ public class MetadataSerializer implements 
IMetadataSerializer
         }
     }
 
-    public void mutateLevel(Descriptor descriptor, int newLevel) throws 
IOException
+    @Override
+    public void mutate(Descriptor descriptor, Function<StatsMetadata, 
StatsMetadata> transform) throws IOException
     {
-        if (logger.isTraceEnabled())
-            logger.trace("Mutating {} to level {}", 
descriptor.filenameFor(Component.STATS), newLevel);
         Map<MetadataType, MetadataComponent> currentComponents = 
deserialize(descriptor, EnumSet.allOf(MetadataType.class));
         StatsMetadata stats = (StatsMetadata) 
currentComponents.remove(MetadataType.STATS);
-        // mutate level
-        currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+
+        currentComponents.put(MetadataType.STATS, transform.apply(stats));
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 
+    public void mutateLevel(Descriptor descriptor, int newLevel) throws 
IOException
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("Mutating {} to level {}", 
descriptor.filenameFor(Component.STATS), newLevel);
+
+        mutate(descriptor, stats -> stats.mutateLevel(newLevel));
+    }
+
     public void mutateRepairMetadata(Descriptor descriptor, long 
newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
     {
         if (logger.isTraceEnabled())
             logger.trace("Mutating {} to repairedAt time {} and pendingRepair 
{}",
                          descriptor.filenameFor(Component.STATS), 
newRepairedAt, newPendingRepair);
-        Map<MetadataType, MetadataComponent> currentComponents = 
deserialize(descriptor, EnumSet.allOf(MetadataType.class));
-        StatsMetadata stats = (StatsMetadata) 
currentComponents.remove(MetadataType.STATS);
-        // mutate time & id
-        currentComponents.put(MetadataType.STATS, 
stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
-        rewriteSSTableMetadata(descriptor, currentComponents);
+
+        mutate(descriptor, stats -> 
stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
     }
 
     public void rewriteSSTableMetadata(Descriptor descriptor, 
Map<MetadataType, MetadataComponent> currentComponents) throws IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to