This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 598a921 Use isTransient=false for ZCS sstables
598a921 is described below
commit 598a92180e2ad95b48419605d270c53497739f35
Author: Zhao Yang <[email protected]>
AuthorDate: Mon May 11 04:29:47 2020 +0800
Use isTransient=false for ZCS sstables
patch by Zhao Yang; reviewed by Blake Eggleston, Dinesh Joshi, Ekaterina
Dimitrova for CASSANDRA-15783
---
CHANGES.txt | 1 +
doc/source/new/transientreplication.rst | 4 ++--
.../CassandraEntireSSTableStreamReader.java | 9 ++++++--
.../io/sstable/metadata/IMetadataSerializer.java | 10 +++++++++
.../io/sstable/metadata/MetadataSerializer.java | 25 +++++++++++++---------
5 files changed, 35 insertions(+), 14 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 01f74f0..1be10dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Speed up entire-file-streaming file containment check and allow
entire-file-streaming for all compaction strategies
(CASSANDRA-15657,CASSANDRA-15783)
* Provide ability to configure IAuditLogger (CASSANDRA-15748)
* Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819)
* Add isTransient to SSTableMetadataView (CASSANDRA-15806)
diff --git a/doc/source/new/transientreplication.rst
b/doc/source/new/transientreplication.rst
index 438f437..aa39a11 100644
--- a/doc/source/new/transientreplication.rst
+++ b/doc/source/new/transientreplication.rst
@@ -33,7 +33,7 @@ Certain nodes act as full replicas (storing all the data for
a given token range
The optimization that is made possible with transient replication is called
"Cheap quorums", which implies that data redundancy is increased without
corresponding increase in storage usage.
-Transient replication is useful when sufficient full replicas are unavailable
to receive and store all the data.
+Transient replication is useful when sufficient full replicas are available to
receive and store all the data.
Transient replication allows you to configure a subset of replicas to only
replicate data that hasn't been incrementally repaired.
As an optimization, we can avoid writing data to a transient replica if we
have successfully written data to the full replicas.
@@ -55,7 +55,7 @@ As an example, create a keyspace with replication factor (RF)
3.
::
CREATE KEYSPACE CassandraKeyspaceSimple WITH replication = {'class':
'SimpleStrategy',
- 'replication_factor' : 4/1};
+ 'replication_factor' : 3/1};
As another example, ``some_keysopace keyspace`` will have 3 replicas in DC1, 1
of which is transient, and 5 replicas in DC2, 2 of which are transient:
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 479ee71..eac37d1 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import com.google.common.base.Function;
import com.google.common.base.Throwables;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
@@ -29,12 +30,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.ProgressInfo;
@@ -54,6 +55,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
private final TableId tableId;
private final StreamSession session;
+ private final StreamMessageHeader messageHeader;
private final CassandraStreamHeader header;
private final int fileSequenceNumber;
@@ -71,6 +73,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
this.header = streamHeader;
this.session = session;
+ this.messageHeader = messageHeader;
this.tableId = messageHeader.tableId;
this.fileSequenceNumber = messageHeader.sequenceNumber;
}
@@ -132,7 +135,9 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
prettyPrintMemory(totalSize));
}
-
writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor,
header.sstableLevel);
+ Function<StatsMetadata, StatsMetadata> transform = stats ->
stats.mutateLevel(header.sstableLevel)
+
.mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair,
false);
+
writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, transform);
return writer;
}
catch (Throwable e)
diff --git
a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index c842d02..db9f161 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
@@ -62,6 +63,15 @@ public interface IMetadataSerializer
MetadataComponent deserialize(Descriptor descriptor, MetadataType type)
throws IOException;
/**
+ * Mutate SSTable Metadata
+ *
+ * @param descriptor SSTable descriptor
+ * @param transform function to mutate sstable metadata
+ * @throws IOException
+ */
+ public void mutate(Descriptor descriptor, Function<StatsMetadata,
StatsMetadata> transform) throws IOException;
+
+ /**
* Mutate SSTable level
*
* @param descriptor SSTable descriptor
diff --git
a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 9cb9a20..0ae7c32 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.*;
import java.util.*;
+import java.util.function.Function;
import java.util.zip.CRC32;
import com.google.common.collect.Lists;
@@ -219,27 +220,31 @@ public class MetadataSerializer implements
IMetadataSerializer
}
}
- public void mutateLevel(Descriptor descriptor, int newLevel) throws
IOException
+ @Override
+ public void mutate(Descriptor descriptor, Function<StatsMetadata,
StatsMetadata> transform) throws IOException
{
- if (logger.isTraceEnabled())
- logger.trace("Mutating {} to level {}",
descriptor.filenameFor(Component.STATS), newLevel);
Map<MetadataType, MetadataComponent> currentComponents =
deserialize(descriptor, EnumSet.allOf(MetadataType.class));
StatsMetadata stats = (StatsMetadata)
currentComponents.remove(MetadataType.STATS);
- // mutate level
- currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+
+ currentComponents.put(MetadataType.STATS, transform.apply(stats));
rewriteSSTableMetadata(descriptor, currentComponents);
}
+ public void mutateLevel(Descriptor descriptor, int newLevel) throws
IOException
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Mutating {} to level {}",
descriptor.filenameFor(Component.STATS), newLevel);
+
+ mutate(descriptor, stats -> stats.mutateLevel(newLevel));
+ }
+
public void mutateRepairMetadata(Descriptor descriptor, long
newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
{
if (logger.isTraceEnabled())
logger.trace("Mutating {} to repairedAt time {} and pendingRepair
{}",
descriptor.filenameFor(Component.STATS),
newRepairedAt, newPendingRepair);
- Map<MetadataType, MetadataComponent> currentComponents =
deserialize(descriptor, EnumSet.allOf(MetadataType.class));
- StatsMetadata stats = (StatsMetadata)
currentComponents.remove(MetadataType.STATS);
- // mutate time & id
- currentComponents.put(MetadataType.STATS,
stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
- rewriteSSTableMetadata(descriptor, currentComponents);
+
+ mutate(descriptor, stats ->
stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
}
public void rewriteSSTableMetadata(Descriptor descriptor,
Map<MetadataType, MetadataComponent> currentComponents) throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]