Repository: cassandra Updated Branches: refs/heads/trunk 16bf51211 -> a5b90f15c
Only stream from unrepaired sstables during incremental repair Patch by marcuse; reviewed by yukim for CASSANDRA-8267 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5b90f15 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5b90f15 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5b90f15 Branch: refs/heads/trunk Commit: a5b90f15c53e256bff4ad382745e34a739a5983a Parents: 16bf512 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Dec 8 15:17:51 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon May 11 09:29:09 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++++++++-- .../org/apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/net/IncomingStreamingConnection.java | 2 +- src/java/org/apache/cassandra/repair/LocalSyncTask.java | 9 ++++++++- .../apache/cassandra/repair/StreamingRepairTask.java | 9 ++++++++- .../apache/cassandra/streaming/ConnectionHandler.java | 3 ++- .../apache/cassandra/streaming/StreamCoordinator.java | 8 +++++--- src/java/org/apache/cassandra/streaming/StreamPlan.java | 8 ++++---- .../apache/cassandra/streaming/StreamResultFuture.java | 9 +++++---- .../org/apache/cassandra/streaming/StreamSession.java | 12 ++++++++++-- .../cassandra/streaming/messages/StreamInitMessage.java | 9 +++++++-- .../org/apache/cassandra/dht/StreamStateStoreTest.java | 4 ++-- .../cassandra/streaming/StreamTransferTaskTest.java | 2 +- 14 files changed, 64 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f2f12c4..bff5970 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267) * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321) * Failure detector detects and ignores local pauses (CASSANDRA-9183) * Remove Thrift dependencies in bundled tools (CASSANDRA-8358) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 26a430a..fec3afc 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1816,7 +1816,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. */ - public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection) + public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired) { return new Function<DataTracker.View, List<SSTableReader>>() { @@ -1824,8 +1824,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Set<SSTableReader> sstables = Sets.newHashSet(); for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection) - sstables.addAll(view.sstablesInBounds(rowBounds)); + { + for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) + { + if (includeRepaired || !sstable.isRepaired()) + sstables.add(sstable); + } + } + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size()); return ImmutableList.copyOf(sstables); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 6991958..910cdcc 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -156,7 +156,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index d00e4b8..274e47b 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -66,7 +66,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 010c959..daace01 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; @@ -65,8 +66,14 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); logger.info("[repair #{}] {}", desc.sessionId, message); + boolean isIncremental = false; + if (desc.parentSessionId != null) + { + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + isIncremental = prs.isIncremental; + } Tracing.traceRepair(message); - new StreamPlan("Repair", repairedAt, 1, false).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index cbf0580..25ef06e 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamPlan; @@ -55,7 +56,13 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler InetAddress dest = request.dst; InetAddress preferred = SystemKeyspace.getPreferredIP(dest); logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst)); - new StreamPlan("Repair", repairedAt, 1, false).listeners(this) + boolean isIncremental = false; + if (desc.parentSessionId != null) + { + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + isIncremental = prs.isIncremental; + } + new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 780018c..bb27972 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -183,7 +183,8 @@ public class ConnectionHandler session.planId(), session.description(), isForOutgoing, - session.keepSSTableLevel()); + session.keepSSTableLevel(), + session.isIncremental()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); DataOutputStreamPlus out = getWriteChannel(socket); out.write(messageBuf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 8d0cdce..603366d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -46,12 +46,14 @@ public class StreamCoordinator private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; + private final boolean isIncremental; - public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory) + public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; + this.isIncremental = isIncremental; } public void setConnectionFactory(StreamConnectionFactory factory) @@ -235,7 +237,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel); + StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental); streamSessions.put(++lastReturned, session); return session; } @@ -267,7 +269,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel); + session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 0e5cc6f..0d963ed 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -47,19 +47,19 @@ public class StreamPlan */ public StreamPlan(String description) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false); } public StreamPlan(String description, boolean keepSSTableLevels) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false); } - public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels) + public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental) { this.description = description; this.repairedAt = repairedAt; - this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory()); + this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index ce9518a..99adab0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -69,9 +69,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels) + private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental) { - this(planId, description, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory())); + this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory())); } static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) @@ -102,7 +102,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> Socket socket, boolean isForOutgoing, int version, - boolean keepSSTableLevel) throws IOException + boolean keepSSTableLevel, + boolean isIncremental) throws IOException { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) @@ -110,7 +111,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, description, keepSSTableLevel); + future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental); StreamManager.instance.registerReceiving(future); } future.attachSocket(from, sessionIndex, socket, isForOutgoing, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 5a056c4..09ee3e4 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -145,6 +145,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private AtomicBoolean isAborted = new AtomicBoolean(false); private final boolean keepSSTableLevel; + private final boolean isIncremental; public static enum State { @@ -166,7 +167,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel) + public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental) { this.peer = peer; this.connecting = connecting; @@ -175,6 +176,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.handler = new ConnectionHandler(this); this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; + this.isIncremental = isIncremental; } public UUID planId() @@ -197,6 +199,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber return keepSSTableLevel; } + public boolean isIncremental() + { + return isIncremental; + } + + /** * Bind this session to report to specific {@link StreamResultFuture} and * perform pre-streaming initialization. @@ -306,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) rowBoundsList.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs); + refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs); } List<SSTableStreamingSections> sections = new ArrayList<>(refs.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 03ac944..4928039 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -48,8 +48,9 @@ public class StreamInitMessage // true if this init message is to connect for outgoing message on receiving side public final boolean isForOutgoing; public final boolean keepSSTableLevel; + public final boolean isIncremental; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental) { this.from = from; this.sessionIndex = sessionIndex; @@ -57,6 +58,7 @@ public class StreamInitMessage this.description = description; this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; + this.isIncremental = isIncremental; } /** @@ -109,6 +111,7 @@ public class StreamInitMessage out.writeUTF(message.description); out.writeBoolean(message.isForOutgoing); out.writeBoolean(message.keepSSTableLevel); + out.writeBoolean(message.isIncremental); } public StreamInitMessage deserialize(DataInput in, int version) throws IOException @@ -119,7 +122,8 @@ public class StreamInitMessage String description = in.readUTF(); boolean sentByInitiator = in.readBoolean(); boolean keepSSTableLevel = in.readBoolean(); - return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel); + boolean isIncremental = in.readBoolean(); + return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental); } public long serializedSize(StreamInitMessage message, int version) @@ -130,6 +134,7 @@ public class StreamInitMessage size += TypeSizes.NATIVE.sizeof(message.description); size += TypeSizes.NATIVE.sizeof(message.isForOutgoing); size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel); + size += TypeSizes.NATIVE.sizeof(message.isIncremental); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index c8b9f05..86781d9 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -42,7 +42,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddress local = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true); + StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false); session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0); StreamStateStore store = new StreamStateStore(); @@ -63,7 +63,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true); + session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false); session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 87b9c38..a0ac870 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -64,7 +64,7 @@ public class StreamTransferTaskTest String cf = "Standard1"; InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(peer, peer, null, 0, true); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false); ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); // create two sstables