This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cep-7-sai
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-7-sai by this push:
new 5d3f257477 Stream all compatible components registered by an SSTable
5d3f257477 is described below
commit 5d3f257477cba2d7f33f842dba4582d0660f5738
Author: Piotr Kołaczkowski <[email protected]>
AuthorDate: Thu Jun 15 15:07:18 2023 +0200
Stream all compatible components registered by an SSTable
patch by Piotr Kołaczkowski; reviewed by Andrés de la Peña and Caleb
Rackliffe for CASSANDRA-18345
---
.../CassandraEntireSSTableStreamReader.java | 2 +-
.../db/streaming/CassandraOutgoingFile.java | 4 +-
.../cassandra/db/streaming/ComponentContext.java | 6 +-
.../cassandra/db/streaming/ComponentManifest.java | 12 +-
.../index/sai/disk/format/IndexComponent.java | 18 +++
.../cassandra/index/sai/disk/format/Version.java | 9 +-
.../index/sai/disk/v1/V1OnDiskFormat.java | 14 +-
.../org/apache/cassandra/io/sstable/Component.java | 26 ++--
.../org/apache/cassandra/io/sstable/SSTable.java | 11 ++
.../io/sstable/SSTableZeroCopyWriter.java | 26 ++--
.../org/apache/cassandra/io/sstable/SSTable_API.md | 14 +-
.../cassandra/io/sstable/format/SSTableFormat.java | 19 ++-
.../cassandra/io/sstable/format/big/BigFormat.java | 20 +--
.../cassandra/io/sstable/format/bti/BtiFormat.java | 19 +--
.../distributed/test/sai/IndexStreamingTest.java | 155 +++++++++++++++++++++
.../microbench/ZeroCopyStreamingBenchmark.java | 2 +-
.../CassandraEntireSSTableStreamWriterTest.java | 4 +-
.../db/streaming/CassandraStreamHeaderTest.java | 4 +-
.../apache/cassandra/io/sstable/ComponentTest.java | 20 +--
.../io/sstable/SSTableZeroCopyWriterTest.java | 2 +-
...ntireSSTableStreamingCorrectFilesCountTest.java | 2 +-
21 files changed, 278 insertions(+), 111 deletions(-)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 9c5e048a12..98e2b6f7ef 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -121,7 +121,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
prettyPrintMemory(bytesRead),
prettyPrintMemory(totalSize));
- writer.writeComponent(component.type, in, length);
+ writer.writeComponent(component, in, length);
session.progress(writer.descriptor.fileFor(component).toString(),
ProgressInfo.Direction.IN, length, length, length);
bytesRead += length;
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 88ecff8a44..9569769eba 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -65,7 +65,7 @@ public class CassandraOutgoingFile implements OutgoingStream
this.filename = sstable.getFilename();
this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
- ComponentManifest manifest =
ComponentManifest.create(sstable.descriptor);
+ ComponentManifest manifest = ComponentManifest.create(sstable);
this.header = makeHeader(sstable, operation, sections, estimatedKeys,
shouldStreamEntireSSTable, manifest);
}
@@ -154,7 +154,7 @@ public class CassandraOutgoingFile implements OutgoingStream
// redistribution, otherwise file sizes recorded in component
manifest will be different from actual
// file sizes.
// Recreate the latest manifest and hard links for mutatable
components in case they are modified.
- try (ComponentContext context = sstable.runWithLock(ignored ->
ComponentContext.create(sstable.descriptor)))
+ try (ComponentContext context = sstable.runWithLock(ignored ->
ComponentContext.create(sstable)))
{
CassandraStreamHeader current = makeHeader(sstable, operation,
sections, estimatedKeys, true, context.manifest());
CassandraStreamHeader.serializer.serialize(current, out,
version);
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
index 164dd6ba5c..c03e7b4c34 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
@@ -44,8 +45,9 @@ public class ComponentContext implements AutoCloseable
this.manifest = manifest;
}
- public static ComponentContext create(Descriptor descriptor)
+ public static ComponentContext create(SSTable sstable)
{
+ Descriptor descriptor = sstable.descriptor;
Map<Component, File> hardLinks = new HashMap<>(1);
for (Component component : descriptor.getFormat().mutableComponents())
@@ -59,7 +61,7 @@ public class ComponentContext implements AutoCloseable
hardLinks.put(component, hardlink);
}
- return new ComponentContext(hardLinks,
ComponentManifest.create(descriptor));
+ return new ComponentContext(hardLinks,
ComponentManifest.create(sstable));
}
public ComponentManifest manifest()
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 5e3cc0c6b6..f220be3a48 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
@@ -33,7 +34,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -52,13 +53,14 @@ public final class ComponentManifest implements
Iterable<Component>
}
@VisibleForTesting
- public static ComponentManifest create(Descriptor descriptor)
+ public static ComponentManifest create(SSTable sstable)
{
- LinkedHashMap<Component, Long> components = new
LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+ Set<Component> streamingComponents = sstable.getStreamingComponents();
+ LinkedHashMap<Component, Long> components = new
LinkedHashMap<>(streamingComponents.size());
- for (Component component :
descriptor.getFormat().streamingComponents())
+ for (Component component : streamingComponents)
{
- File file = descriptor.fileFor(component);
+ File file = sstable.descriptor.fileFor(component);
if (!file.exists())
continue;
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
index 670ba82c09..4d802ccae9 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java
@@ -18,8 +18,15 @@
package org.apache.cassandra.index.sai.disk.format;
+import java.util.regex.Pattern;
+
import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter;
import org.apache.cassandra.index.sai.disk.v1.trie.TrieTermsDictionaryWriter;
+import org.apache.cassandra.io.sstable.Component;
+
+import static
org.apache.cassandra.index.sai.disk.format.Version.SAI_DESCRIPTOR;
+import static org.apache.cassandra.index.sai.disk.format.Version.SAI_SEPARATOR;
+
/**
* This is a definitive list of all the on-disk components for all versions
@@ -80,9 +87,20 @@ public enum IndexComponent
GROUP_COMPLETION_MARKER("GroupComplete");
public final String name;
+ public final Component.Type type;
IndexComponent(String name)
{
this.name = name;
+ this.type = componentType(name);
+ }
+
+ private static Component.Type componentType(String name)
+ {
+ String componentName = SAI_DESCRIPTOR + SAI_SEPARATOR + name;
+ String repr = Pattern.quote(SAI_DESCRIPTOR + SAI_SEPARATOR)
+ + ".*"
+ + Pattern.quote(SAI_SEPARATOR + name + ".db");
+ return Component.Type.create(componentName, repr, true, null);
}
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
index 16e84d94c7..8dd57581a3 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
/**
* Format version of indexing component, denoted as [major][minor]. Same
forward-compatibility rules apply as to
@@ -37,8 +36,8 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
*/
public class Version implements Comparable<Version>
{
- private static final String SAI_DESCRIPTOR = "SAI";
- private static final String SAI_SEPARATOR = "+";
+ public static final String SAI_DESCRIPTOR = "SAI";
+ public static final String SAI_SEPARATOR = "+";
// Current version
public static final Version AA = new Version("aa",
V1OnDiskFormat.instance, (c, i) -> defaultFileNameFormat(c, i, "aa"));
@@ -113,12 +112,12 @@ public class Version implements Comparable<Version>
public Component makePerSSTableComponent(IndexComponent indexComponent)
{
- return
SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent,
null));
+ return
indexComponent.type.createComponent(fileNameFormatter.format(indexComponent,
null));
}
public Component makePerIndexComponent(IndexComponent indexComponent,
IndexContext indexContext)
{
- return
SSTableFormat.Components.Types.CUSTOM.createComponent(fileNameFormatter.format(indexComponent,
indexContext));
+ return
indexComponent.type.createComponent(fileNameFormatter.format(indexComponent,
indexContext));
}
public FileNameFormatter fileNameFormatter()
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index c55d4b07ef..189d9b6a5f 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,13 +55,15 @@ public class V1OnDiskFormat implements OnDiskFormat
{
private static final Logger logger =
LoggerFactory.getLogger(V1OnDiskFormat.class);
- private static final Set<IndexComponent> PER_SSTABLE_COMPONENTS =
EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
+ @VisibleForTesting
+ public static final Set<IndexComponent> PER_SSTABLE_COMPONENTS =
EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
IndexComponent.GROUP_META,
IndexComponent.TOKEN_VALUES,
IndexComponent.PRIMARY_KEY_TRIE,
IndexComponent.PRIMARY_KEY_BLOCKS,
IndexComponent.PRIMARY_KEY_BLOCK_OFFSETS);
- private static final Set<IndexComponent> LITERAL_COMPONENTS =
EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
+ @VisibleForTesting
+ public static final Set<IndexComponent> LITERAL_COMPONENTS =
EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
IndexComponent.META,
IndexComponent.TERMS_DATA,
IndexComponent.POSTING_LISTS);
@@ -170,10 +173,11 @@ public class V1OnDiskFormat implements OnDiskFormat
{
if (logger.isDebugEnabled())
{
- logger.debug(indexDescriptor.logMessage("{} failed for
index component {} on SSTable {}"),
- (checksum ? "Checksum validation" :
"Validation"),
+ logger.debug(indexDescriptor.logMessage("{} failed for
index component {} on SSTable {}. Error: {}"),
+ checksum ? "Checksum validation" :
"Validation",
indexComponent,
- indexDescriptor.sstableDescriptor);
+ indexDescriptor.sstableDescriptor,
+ e);
}
return false;
}
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java
b/src/java/org/apache/cassandra/io/sstable/Component.java
index f2eea992a3..0d89cf0b92 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -52,6 +52,7 @@ public class Component
public final int id;
public final String name;
public final String repr;
+ public final boolean streamable;
private final Component singleton;
@SuppressWarnings("rawtypes")
@@ -60,31 +61,34 @@ public class Component
/**
* Creates a new non-singleton type and registers it a global type
registry - see {@link #registerType(Type)}.
*
- * @param name type name, must be unique for this and all
parent formats
- * @param repr the regular expression to be used to recognize a
name represents this type
- * @param formatClass format class for which this type is defined for
+ * @param name type name, must be unique for this and all
parent formats
+ * @param repr the regular expression to be used to recognize
a name represents this type
+ * @param streamable whether components of this type should be
streamed to other nodes
+ * @param formatClass format class for which this type is defined for
*/
- public static Type create(String name, String repr, Class<? extends
SSTableFormat<?, ?>> formatClass)
+ public static Type create(String name, String repr, boolean
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
{
- return new Type(name, repr, false, formatClass);
+ return new Type(name, repr, false, streamable, formatClass);
}
/**
* Creates a new singleton type and registers it in a global type
registry - see {@link #registerType(Type)}.
*
- * @param name type name, must be unique for this and all
parent formats
- * @param repr the regular expression to be used to recognize a
name represents this type
- * @param formatClass format class for which this type is defined for
+ * @param name type name, must be unique for this and all
parent formats
+ * @param repr the regular expression to be used to recognize
a name represents this type
+ * @param streamable whether components of this type should be
streamed to other nodes
+ * @param formatClass format class for which this type is defined for
*/
- public static Type createSingleton(String name, String repr, Class<?
extends SSTableFormat<?, ?>> formatClass)
+ public static Type createSingleton(String name, String repr, boolean
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
{
- return new Type(name, repr, true, formatClass);
+ return new Type(name, repr, true, streamable, formatClass);
}
- private Type(String name, String repr, boolean isSingleton, Class<?
extends SSTableFormat<?, ?>> formatClass)
+ private Type(String name, String repr, boolean isSingleton, boolean
streamable, Class<? extends SSTableFormat<?, ?>> formatClass)
{
this.name = Objects.requireNonNull(name);
this.repr = repr;
+ this.streamable = streamable;
this.id = typesCollector.size();
this.formatClass = formatClass == null ? SSTableFormat.class :
formatClass;
this.singleton = isSingleton ? new Component(this) : null;
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 7936e2ea53..475f92beeb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -162,6 +163,16 @@ public abstract class SSTable
return ImmutableSet.copyOf(components);
}
+ /**
+ * Returns all SSTable components that should be streamed.
+ */
+ public Set<Component> getStreamingComponents()
+ {
+ return components.stream()
+ .filter(c -> c.type.streamable)
+ .collect(Collectors.toSet());
+ }
+
public TableMetadata metadata()
{
return metadata.get();
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 91e6490971..5306661b99 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -23,10 +23,10 @@ import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class SSTableZeroCopyWriter extends SSTable implements
SSTableMultiWriter
private static final Logger logger =
LoggerFactory.getLogger(SSTableZeroCopyWriter.class);
private volatile SSTableReader finalReader;
- private final Map<Component.Type, SequentialWriter> componentWriters;
+ private final Map<String, SequentialWriter> componentWriters; // indexed
by component name
public SSTableZeroCopyWriter(Builder<?, ?> builder,
LifecycleNewTracker lifecycleNewTracker,
@@ -61,12 +61,14 @@ public class SSTableZeroCopyWriter extends SSTable
implements SSTableMultiWriter
lifecycleNewTracker.trackNew(this);
this.componentWriters = new HashMap<>();
- if
(!descriptor.getFormat().streamingComponents().containsAll(components))
- throw new AssertionError(format("Unsupported streaming component
detected %s",
-
Sets.difference(ImmutableSet.copyOf(components),
descriptor.getFormat().streamingComponents())));
+ Set<Component> unsupported = components.stream()
+ .filter(c -> !c.type.streamable)
+ .collect(Collectors.toSet());
+ if (!unsupported.isEmpty())
+ throw new AssertionError(format("Unsupported streaming components
detected: %s", unsupported));
for (Component c : components)
- componentWriters.put(c.type, makeWriter(descriptor, c));
+ componentWriters.put(c.name, makeWriter(descriptor, c));
}
@Override
@@ -195,14 +197,16 @@ public class SSTableZeroCopyWriter extends SSTable
implements SSTableMultiWriter
writer.close();
}
- public void writeComponent(Component.Type type, DataInputPlus in, long
size) throws ClosedChannelException
+ public void writeComponent(Component component, DataInputPlus in, long
size) throws ClosedChannelException
{
- logger.info("Writing component {} to {} length {}", type,
componentWriters.get(type).getPath(), prettyPrintMemory(size));
+ @SuppressWarnings({"resource", "RedundantSuppression"}) // all
writers are closed in close()
+ SequentialWriter writer = componentWriters.get(component.name);
+ logger.info("Writing component {} to {} length {}", component,
writer.getPath(), prettyPrintMemory(size));
if (in instanceof AsyncStreamingInputPlus)
- write((AsyncStreamingInputPlus) in, size,
componentWriters.get(type));
+ write((AsyncStreamingInputPlus) in, size, writer);
else
- write(in, size, componentWriters.get(type));
+ write(in, size, writer);
}
private void write(AsyncStreamingInputPlus in, long size, SequentialWriter
writer) throws ClosedChannelException
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
index 6a0440655e..7fb23ca4d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable_API.md
@@ -77,19 +77,19 @@ Apart from the generic components, each sstable format
implementation may descri
For example, the _big table_ format describes additionally `PRIMARY_INDEX` and
`SUMMARY` singleton types and
the corresponding singleton components (see
[`BigFormat.Components`](format/big/BigFormat.java)).
-Custom types can be created with one of the `Component.Type.create(name, repr,
formatClass)`,
-`Component.Type.createSingleton(name, repr, formatClass)` methods. Each
created type is registered in a global types'
-registry. Types registry is hierarchical which means that an sstable
implementation may use types defined for its
-format class and for all parent format classes (for example, the types defined
for the `BigFormat` class extend the set
-of types defined for the `SSTableFormat` interface).
+Custom types can be created with one of the `Component.Type.create(name, repr,
streamable, formatClass)`,
+`Component.Type.createSingleton(name, repr, streamable, formatClass)` methods.
Each created type is registered in
+a global types' registry. Types registry is hierarchical which means that an
sstable implementation may use types
+defined for its format class and for all parent format classes (for example,
the types defined for the `BigFormat` class
+extend the set of types defined for the `SSTableFormat` interface).
For example, types defined for `BigFormat`:
```java
public static class Types extends SSTableFormat.Components.Types
{
- public static final Component.Type PRIMARY_INDEX =
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
- public static final Component.Type SUMMARY =
Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class);
+ public static final Component.Type PRIMARY_INDEX =
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true,
BigFormat.class);
+ public static final Component.Type SUMMARY =
Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class);
}
```
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 1caa87b9cc..654880c2c1 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -59,7 +59,6 @@ public interface SSTableFormat<R extends SSTableReader, W
extends SSTableWriter>
*/
Set<Component> allComponents();
- Set<Component> streamingComponents();
Set<Component> primaryComponents();
@@ -156,23 +155,23 @@ public interface SSTableFormat<R extends SSTableReader, W
extends SSTableWriter>
{
// the base data for an sstable: the remaining components can be
regenerated
// based on the data component
- public static final Component.Type DATA =
Component.Type.createSingleton("DATA", "Data.db", null);
+ public static final Component.Type DATA =
Component.Type.createSingleton("DATA", "Data.db", true, null);
// file to hold information about uncompressed data length, chunk
offsets etc.
- public static final Component.Type COMPRESSION_INFO =
Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+ public static final Component.Type COMPRESSION_INFO =
Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true,
null);
// statistical metadata about the content of the sstable
- public static final Component.Type STATS =
Component.Type.createSingleton("STATS", "Statistics.db", null);
+ public static final Component.Type STATS =
Component.Type.createSingleton("STATS", "Statistics.db", true, null);
// serialized bloom filter for the row keys in the sstable
- public static final Component.Type FILTER =
Component.Type.createSingleton("FILTER", "Filter.db", null);
+ public static final Component.Type FILTER =
Component.Type.createSingleton("FILTER", "Filter.db", true, null);
// holds CRC32 checksum of the data file
- public static final Component.Type DIGEST =
Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+ public static final Component.Type DIGEST =
Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
// holds the CRC32 for chunks in an uncompressed file.
- public static final Component.Type CRC =
Component.Type.createSingleton("CRC", "CRC.db", null);
+ public static final Component.Type CRC =
Component.Type.createSingleton("CRC", "CRC.db", true, null);
// table of contents, stores the list of all components for the
sstable
- public static final Component.Type TOC =
Component.Type.createSingleton("TOC", "TOC.txt", null);
+ public static final Component.Type TOC =
Component.Type.createSingleton("TOC", "TOC.txt", false, null);
// built-in secondary index (may exist multiple per sstable)
- public static final Component.Type SECONDARY_INDEX =
Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+ public static final Component.Type SECONDARY_INDEX =
Component.Type.create("SECONDARY_INDEX", "SI_.*.db", false, null);
// custom component, used by e.g. custom compaction strategy
- public static final Component.Type CUSTOM =
Component.Type.create("CUSTOM", null, null);
+ public static final Component.Type CUSTOM =
Component.Type.create("CUSTOM", null, true, null);
}
// singleton components for types that don't need ids
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 97f244ca66..bee089a16d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -82,9 +82,9 @@ public class BigFormat extends
AbstractSSTableFormat<BigTableReader, BigTableWri
public static class Types extends SSTableFormat.Components.Types
{
// index of the row keys with pointers to their positions in the
data file
- public static final Component.Type PRIMARY_INDEX =
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
+ public static final Component.Type PRIMARY_INDEX =
Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true,
BigFormat.class);
// holds SSTable Index Summary (sampling of Index component)
- public static final Component.Type SUMMARY =
Component.Type.createSingleton("SUMMARY", "Summary.db", BigFormat.class);
+ public static final Component.Type SUMMARY =
Component.Type.createSingleton("SUMMARY", "Summary.db", true, BigFormat.class);
}
public final static Component PRIMARY_INDEX =
Types.PRIMARY_INDEX.getSingleton();
@@ -109,16 +109,6 @@ public class BigFormat extends
AbstractSSTableFormat<BigTableReader, BigTableWri
SUMMARY,
COMPRESSION_INFO,
STATS);
-
- private static final Set<Component> STREAM_COMPONENTS =
ImmutableSet.of(DATA,
-
PRIMARY_INDEX,
-
STATS,
-
COMPRESSION_INFO,
-
FILTER,
-
SUMMARY,
-
DIGEST,
-
CRC);
-
private static final Set<Component> ALL_COMPONENTS =
ImmutableSet.of(DATA,
PRIMARY_INDEX,
STATS,
@@ -180,12 +170,6 @@ public class BigFormat extends
AbstractSSTableFormat<BigTableReader, BigTableWri
return Components.ALL_COMPONENTS;
}
- @Override
- public Set<Component> streamingComponents()
- {
- return Components.STREAM_COMPONENTS;
- }
-
@Override
public Set<Component> primaryComponents()
{
diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
index 45cba4fcae..d60c3369d6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
@@ -71,23 +71,14 @@ public class BtiFormat extends
AbstractSSTableFormat<BtiTableReader, BtiTableWri
{
public static class Types extends
AbstractSSTableFormat.Components.Types
{
- public static final Component.Type PARTITION_INDEX =
Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db",
BtiFormat.class);
- public static final Component.Type ROW_INDEX =
Component.Type.createSingleton("ROW_INDEX", "Rows.db", BtiFormat.class);
+ public static final Component.Type PARTITION_INDEX =
Component.Type.createSingleton("PARTITION_INDEX", "Partitions.db", true,
BtiFormat.class);
+ public static final Component.Type ROW_INDEX =
Component.Type.createSingleton("ROW_INDEX", "Rows.db", true, BtiFormat.class);
}
public final static Component PARTITION_INDEX =
Types.PARTITION_INDEX.getSingleton();
public final static Component ROW_INDEX =
Types.ROW_INDEX.getSingleton();
- private final static Set<Component> STREAMING_COMPONENTS =
ImmutableSet.of(DATA,
-
PARTITION_INDEX,
-
ROW_INDEX,
-
STATS,
-
COMPRESSION_INFO,
-
FILTER,
-
DIGEST,
-
CRC);
-
private final static Set<Component> PRIMARY_COMPONENTS =
ImmutableSet.of(DATA,
PARTITION_INDEX);
@@ -159,12 +150,6 @@ public class BtiFormat extends
AbstractSSTableFormat<BtiTableReader, BtiTableWri
return readerFactory;
}
- @Override
- public Set<Component> streamingComponents()
- {
- return Components.STREAMING_COMPONENTS;
- }
-
@Override
public Set<Component> primaryComponents()
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
new file mode 100644
index 0000000000..3069e807a8
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+ // streaming sends events every 65k, so need to make sure that the files
are larger than this to hit
+ // all cases of the vtable - hence we add a big enough blob column
+ private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+ private static final int NUM_COMPONENTS;
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ NUM_COMPONENTS = sstableStreamingComponentsCount()
+ + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+ + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+ }
+
+ private static int sstableStreamingComponentsCount()
+ {
+ return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+ .allComponents()
+ .stream()
+ .filter(c -> c.type.streamable)
+ .count() - 1; // -1 because we don't
include the compression component
+ }
+
+ @Test
+ public void zeroCopy() throws IOException
+ {
+ test(true);
+ }
+
+ @Test
+ public void notZeroCopy() throws IOException
+ {
+ test(false);
+ }
+
+ private void test(boolean zeroCopyStreaming) throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(c ->
c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+
.set("streaming_slow_events_log_timeout", "0s"))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace(
+ "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob)
WITH compression = { 'enabled' : false };"
+ ));
+ cluster.schemaChange(withKeyspace(
+ "CREATE CUSTOM INDEX ON %s.test(v) USING
'StorageAttachedIndex';"
+ ));
+ cluster.stream().forEach(i ->
+ i.nodetoolResult("disableautocompaction",
KEYSPACE).asserts().success()
+ );
+ IInvokableInstance first = cluster.get(1);
+ IInvokableInstance second = cluster.get(2);
+ long sstableCount = 10;
+ long expectedFiles = zeroCopyStreaming ? sstableCount *
NUM_COMPONENTS : sstableCount;
+ for (int i = 0; i < sstableCount; i++)
+ {
+ first.executeInternal(withKeyspace("insert into %s.test(pk, v,
b) values (?, ?, ?)"), i, "v" + i, BLOB);
+ first.flush(KEYSPACE);
+ }
+
+ second.nodetoolResult("rebuild", "--keyspace",
KEYSPACE).asserts().success();
+
+ SimpleQueryResult qr = first.executeInternalWithResult("SELECT *
FROM system_views.streaming");
+ String txt = QueryResultUtil.expand(qr);
+ qr.reset();
+ assertThat(qr.toObjectArrays().length).describedAs("Found
rows\n%s", txt).isEqualTo(1);
+ assertThat(qr.hasNext()).isTrue();
+ Row row = qr.next();
+ QueryResultUtil.assertThat(row)
+ .isEqualTo("peers",
Collections.singletonList(second.broadcastAddress().toString()))
+ .isEqualTo("follower", true)
+ .isEqualTo("operation", "Rebuild")
+ .isEqualTo("status", "success")
+ .isEqualTo("progress_percentage", 100.0F)
+ .isEqualTo("success_message",
null).isEqualTo("failure_cause", null)
+ .isEqualTo("files_sent", expectedFiles)
+ .columnsEqualTo("files_sent", "files_to_send")
+ .columnsEqualTo("bytes_sent", "bytes_to_send")
+ .isEqualTo("files_received", 0L)
+ .columnsEqualTo("files_received",
"files_to_receive", "bytes_received", "bytes_to_receive");
+ long totalBytes = row.getLong("bytes_sent");
+ assertThat(totalBytes).isGreaterThan(0);
+
+ qr = second.executeInternalWithResult("SELECT * FROM
system_views.streaming");
+ txt = QueryResultUtil.expand(qr);
+ qr.reset();
+ assertThat(qr.toObjectArrays().length).describedAs("Found
rows\n%s", txt).isEqualTo(1);
+ assertThat(qr.hasNext()).isTrue();
+
+ QueryResultUtil.assertThat(qr.next())
+ .isEqualTo("peers",
Collections.singletonList(first.broadcastAddress().toString()))
+ .isEqualTo("follower", false)
+ .isEqualTo("operation", "Rebuild")
+ .isEqualTo("status", "success")
+ .isEqualTo("progress_percentage", 100.0F)
+ .isEqualTo("success_message",
null).isEqualTo("failure_cause", null)
+ .columnsEqualTo("files_to_receive",
"files_received").isEqualTo("files_received", expectedFiles)
+ .columnsEqualTo("bytes_to_receive",
"bytes_received").isEqualTo("bytes_received", totalBytes)
+ .columnsEqualTo("files_sent", "files_to_send",
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+
+ // did we trigger slow event log?
+ cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling
streaming events took longer than").getResult())
+ .describedAs("Unable to find slow
log for node%d", i.config().num())
+ .isNotEmpty());
+
+ for (int i = 0; i < sstableCount; i++)
+ {
+ Object[][] rs = second.executeInternal(withKeyspace("select pk
from %s.test where v = ?"), "v" + i);
+ assertThat(rs.length).isEqualTo(1);
+ assertThat(rs[0][0]).isEqualTo(i);
+ }
+ }
+ }
+}
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 59acfc6ae6..dd522d0805 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -121,7 +121,7 @@ public class ZeroCopyStreamingBenchmark
sstable = store.getLiveSSTables().iterator().next();
session = setupStreamingSessionForTest();
- context = ComponentContext.create(sstable.descriptor);
+ context = ComponentContext.create(sstable);
blockStreamWriter = new
CassandraEntireSSTableStreamWriter(sstable, session, context);
CapturingNettyChannel blockStreamCaptureChannel = new
CapturingNettyChannel(STREAM_SIZE);
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index 898da7c782..1b5112076d 100644
---
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -117,7 +117,7 @@ public class CassandraEntireSSTableStreamWriterTest
EmbeddedChannel channel = new EmbeddedChannel();
try (AsyncStreamingOutputPlus out = new
AsyncStreamingOutputPlus(channel);
- ComponentContext context = ComponentContext.create(descriptor))
+ ComponentContext context = ComponentContext.create(sstable))
{
CassandraEntireSSTableStreamWriter writer = new
CassandraEntireSSTableStreamWriter(sstable, session, context);
@@ -140,7 +140,7 @@ public class CassandraEntireSSTableStreamWriterTest
ByteBuf serializedFile = Unpooled.buffer(8192);
EmbeddedChannel channel = createMockNettyChannel(serializedFile);
try (AsyncStreamingOutputPlus out = new
AsyncStreamingOutputPlus(channel);
- ComponentContext context = ComponentContext.create(descriptor))
+ ComponentContext context = ComponentContext.create(sstable))
{
CassandraEntireSSTableStreamWriter writer = new
CassandraEntireSSTableStreamWriter(sstable, session, context);
writer.write(out);
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index a0e4e51068..851ac9cb3f 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -108,7 +108,7 @@ public class CassandraStreamHeaderTest
// verify all component on-disk length is used for ZCS
CassandraStreamHeader header = header(true, true);
long transferedSize = header.size();
- assertEquals(ComponentManifest.create(sstable.descriptor).totalSize(),
transferedSize);
+ assertEquals(ComponentManifest.create(sstable).totalSize(),
transferedSize);
assertEquals(transferedSize, header.calculateSize());
// verify that computing file chunks doesn't change transferred size
for ZCS
@@ -142,7 +142,7 @@ public class CassandraStreamHeaderTest
TableMetadata metadata = store.metadata();
SerializationHeader.Component serializationHeader =
SerializationHeader.makeWithoutStats(metadata).toComponent();
- ComponentManifest componentManifest = entireSSTable ?
ComponentManifest.create(sstable.descriptor) : null;
+ ComponentManifest componentManifest = entireSSTable ?
ComponentManifest.create(sstable) : null;
DecoratedKey firstKey = entireSSTable ? sstable.first : null;
return CassandraStreamHeader.builder()
diff --git a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
index a9b76303f2..e58238c444 100644
--- a/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/ComponentTest.java
@@ -57,18 +57,18 @@ public class ComponentTest
Function<Type, Component> componentFactory =
Mockito.mock(Function.class);
// do not allow to define a type with the same name or repr as the
existing type for this or parent format
- assertThatExceptionOfType(AssertionError.class).isThrownBy(() ->
Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr +
"x", Format1.class));
- assertThatExceptionOfType(AssertionError.class).isThrownBy(() ->
Type.createSingleton(Components.Types.TOC.name + "x",
Components.Types.TOC.repr, Format2.class));
+ assertThatExceptionOfType(AssertionError.class).isThrownBy(() ->
Type.createSingleton(Components.Types.TOC.name, Components.Types.TOC.repr +
"x", true, Format1.class));
+ assertThatExceptionOfType(AssertionError.class).isThrownBy(() ->
Type.createSingleton(Components.Types.TOC.name + "x",
Components.Types.TOC.repr, true, Format2.class));
// allow to define a format with other name and repr
- Type t1 = Type.createSingleton("ONE", "One.db", Format1.class);
+ Type t1 = Type.createSingleton("ONE", "One.db", true, Format1.class);
// allow to define a format with the same name and repr for two
different formats
- Type t2f1 = Type.createSingleton("TWO", "Two.db", Format1.class);
- Type t2f2 = Type.createSingleton("TWO", "Two.db", Format2.class);
+ Type t2f1 = Type.createSingleton("TWO", "Two.db", true, Format1.class);
+ Type t2f2 = Type.createSingleton("TWO", "Two.db", true, Format2.class);
assertThat(t2f1).isNotEqualTo(t2f2);
- assertThatExceptionOfType(NullPointerException.class).isThrownBy(() ->
Type.createSingleton(null, "-Three.db", Format1.class));
+ assertThatExceptionOfType(NullPointerException.class).isThrownBy(() ->
Type.createSingleton(null, "-Three.db", true, Format1.class));
assertThat(Type.fromRepresentation("should be custom",
BigFormat.getInstance())).isSameAs(Components.Types.CUSTOM);
assertThat(Type.fromRepresentation(Components.Types.TOC.repr,
BigFormat.getInstance())).isSameAs(Components.Types.TOC);
@@ -80,10 +80,10 @@ public class ComponentTest
@Test
public void testComponents()
{
- Type t3f1 = Type.createSingleton("THREE", "Three.db", Format1.class);
- Type t3f2 = Type.createSingleton("THREE", "Three.db", Format2.class);
- Type t4f1 = Type.create("FOUR", ".*-Four.db", Format1.class);
- Type t4f2 = Type.create("FOUR", ".*-Four.db", Format2.class);
+ Type t3f1 = Type.createSingleton("THREE", "Three.db", true,
Format1.class);
+ Type t3f2 = Type.createSingleton("THREE", "Three.db", true,
Format2.class);
+ Type t4f1 = Type.create("FOUR", ".*-Four.db", true, Format1.class);
+ Type t4f2 = Type.create("FOUR", ".*-Four.db", true, Format2.class);
Component c1 = t3f1.getSingleton();
Component c2 = t3f2.getSingleton();
diff --git
a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
index 11b92ac499..4d64a73602 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableZeroCopyWriterTest.java
@@ -168,7 +168,7 @@ public class SSTableZeroCopyWriterTest
try
{
- btzcw.writeComponent(component.type, pair.left,
pair.right);
+ btzcw.writeComponent(component, pair.left, pair.right);
}
catch (ClosedChannelException e)
{
diff --git
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 5a8f4a58cf..a4022cb346 100644
---
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -137,7 +137,7 @@ public class EntireSSTableStreamingCorrectFilesCountTest
int totalNumberOfFiles =
session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
-
assertEquals(ComponentManifest.create(sstable.descriptor).components().size(),
totalNumberOfFiles);
+ assertEquals(ComponentManifest.create(sstable).components().size(),
totalNumberOfFiles);
assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]