This is an automated email from the ASF dual-hosted git repository.
djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b1411a4 Fix streaming stats during entire sstable streaming
b1411a4 is described below
commit b1411a43180e0085ae4741f4da567a08b5a28f17
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Apr 6 10:11:42 2020 +0200
Fix streaming stats during entire sstable streaming
Patch by Stefan Miklosovic; Reviewed by Dinesh Joshi for CASSANDRA-15694
---
.../db/streaming/CassandraIncomingFile.java | 10 +
.../db/streaming/CassandraOutgoingFile.java | 21 +-
.../apache/cassandra/streaming/IncomingStream.java | 1 +
.../apache/cassandra/streaming/OutgoingStream.java | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 2 +-
.../cassandra/streaming/StreamTransferTask.java | 16 +-
...ntireSSTableStreamingCorrectFilesCountTest.java | 236 +++++++++++++++++++++
.../streaming/StreamTransferTaskTest.java | 4 +-
8 files changed, 278 insertions(+), 13 deletions(-)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index c65ca62..807d935 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -46,6 +46,7 @@ public class CassandraIncomingFile implements IncomingStream
private volatile SSTableMultiWriter sstable;
private volatile long size = -1;
+ private volatile int numFiles = 1;
private static final Logger logger =
LoggerFactory.getLogger(CassandraIncomingFile.class);
@@ -64,7 +65,10 @@ public class CassandraIncomingFile implements IncomingStream
IStreamReader reader;
if (streamHeader.isEntireSSTable)
+ {
reader = new CassandraEntireSSTableStreamReader(header,
streamHeader, session);
+ numFiles = streamHeader.componentManifest.components().size();
+ }
else if (streamHeader.isCompressed())
reader = new CassandraCompressedStreamReader(header, streamHeader,
session);
else
@@ -88,6 +92,12 @@ public class CassandraIncomingFile implements IncomingStream
}
@Override
+ public int getNumFiles()
+ {
+ return numFiles;
+ }
+
+ @Override
public TableId getTableId()
{
Preconditions.checkState(sstable != null, "Stream hasn't been read
yet");
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 237c0af..0917fba 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -59,7 +59,7 @@ public class CassandraOutgoingFile implements OutgoingStream
private final boolean keepSSTableLevel;
private final ComponentManifest manifest;
- private final boolean shouldStreamEntireSStable;
+ private final boolean shouldStreamEntireSSTable;
public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader>
ref,
List<SSTableReader.PartitionPositionBounds>
sections, List<Range<Token>> normalizedRanges,
@@ -72,7 +72,7 @@ public class CassandraOutgoingFile implements OutgoingStream
this.sections = sections;
this.filename = ref.get().getFilename();
this.manifest = getComponentManifest(ref.get());
- this.shouldStreamEntireSStable = shouldStreamEntireSSTable();
+ this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
SSTableReader sstable = ref.get();
keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation
== StreamOperation.REBUILD;
@@ -85,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream
.withSections(sections)
.withCompressionMetadata(sstable.compression
? sstable.getCompressionMetadata() : null)
.withSerializationHeader(sstable.header.toComponent())
- .isEntireSSTable(shouldStreamEntireSStable)
+ .isEntireSSTable(shouldStreamEntireSSTable)
.withComponentManifest(manifest)
.withFirstKey(sstable.first)
.withTableId(sstable.metadata().id)
@@ -137,6 +137,12 @@ public class CassandraOutgoingFile implements
OutgoingStream
}
@Override
+ public int getNumFiles()
+ {
+ return shouldStreamEntireSSTable ? getManifestSize() : 1;
+ }
+
+ @Override
public long getRepairedAt()
{
return ref.get().getRepairedAt();
@@ -148,6 +154,11 @@ public class CassandraOutgoingFile implements
OutgoingStream
return ref.get().getPendingRepair();
}
+ public int getManifestSize()
+ {
+ return manifest.components().size();
+ }
+
@Override
public void write(StreamSession session, DataOutputStreamPlus out, int
version) throws IOException
{
@@ -155,7 +166,7 @@ public class CassandraOutgoingFile implements OutgoingStream
CassandraStreamHeader.serializer.serialize(header, out, version);
out.flush();
- if (shouldStreamEntireSStable && out instanceof
AsyncStreamingOutputPlus)
+ if (shouldStreamEntireSSTable && out instanceof
AsyncStreamingOutputPlus)
{
CassandraEntireSSTableStreamWriter writer = new
CassandraEntireSSTableStreamWriter(sstable, session, manifest);
writer.write((AsyncStreamingOutputPlus) out);
@@ -171,7 +182,7 @@ public class CassandraOutgoingFile implements OutgoingStream
}
@VisibleForTesting
- public boolean shouldStreamEntireSSTable()
+ public boolean computeShouldStreamEntireSSTables()
{
// don't stream if full sstable transfers are disabled or legacy
counter shards are present
if (!DatabaseDescriptor.streamEntireSSTables() ||
ref.get().getSSTableMetadata().hasLegacyCounterShards)
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java
b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 18bebf5..55fbd4f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -41,5 +41,6 @@ public interface IncomingStream
String getName();
long getSize();
+ int getNumFiles();
TableId getTableId();
}
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java
b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
index e71b985..4a58cae 100644
--- a/src/java/org/apache/cassandra/streaming/OutgoingStream.java
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -49,4 +49,5 @@ public interface OutgoingStream
String getName();
long getSize();
TableId getTableId();
+ int getNumFiles();
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 87d6ce0..25977a5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -82,7 +82,7 @@ public class StreamReceiveTask extends StreamTask
return;
}
- remoteStreamsReceived++;
+ remoteStreamsReceived += stream.getNumFiles();
bytesReceived += stream.getSize();
Preconditions.checkArgument(tableId.equals(stream.getTableId()));
logger.debug("received {} of {} total files {} of total bytes {}",
remoteStreamsReceived, totalStreams,
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index ba05acd..0f7a834 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -17,16 +17,20 @@
*/
package org.apache.cassandra.streaming;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +56,8 @@ public class StreamTransferTask extends StreamTask
protected final Map<Integer, OutgoingStreamMessage> streams = new
HashMap<>();
private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
- private long totalSize;
+ private long totalSize = 0;
+ private int totalFiles = 0;
public StreamTransferTask(StreamSession session, TableId tableId)
{
@@ -66,6 +71,7 @@ public class StreamTransferTask extends StreamTask
message = StreamHook.instance.reportOutgoingStream(session, stream,
message);
streams.put(message.header.sequenceNumber, message);
totalSize += message.stream.getSize();
+ totalFiles += message.stream.getNumFiles();
}
/**
@@ -125,7 +131,7 @@ public class StreamTransferTask extends StreamTask
public synchronized int getTotalNumberOfFiles()
{
- return streams.size();
+ return totalFiles;
}
public long getTotalSize()
diff --git
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
new file mode 100644
index 0000000..a57fcbc
--- /dev/null
+++
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.AsyncStreamingOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SharedDefaultFileRegion;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import static
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class EntireSSTableStreamingCorrectFilesCountTest
+{
+ public static final String KEYSPACE =
"EntireSSTableStreamingCorrectFilesCountTest";
+ public static final String CF_STANDARD = "Standard1";
+
+ private static SSTableReader sstable;
+ private static ColumnFamilyStore store;
+ private static RangesAtEndpoint rangesAtEndpoint;
+
+ @BeforeClass
+ public static void defineSchemaAndPrepareSSTable()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE,
CF_STANDARD)
+ // LeveledCompactionStrategy
is important here,
+ // streaming of entire
SSTables works currently only with this strategy
+
.compaction(CompactionParams.lcs(Collections.emptyMap()))
+
.partitioner(ByteOrderedPartitioner.instance));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ store = keyspace.getColumnFamilyStore(CF_STANDARD);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ sstable = store.getLiveSSTables().iterator().next();
+
+ Token start =
ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(0));
+ Token end =
ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(100));
+
+ rangesAtEndpoint =
RangesAtEndpoint.toDummyList(Collections.singleton(new Range<>(start, end)));
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ FileCountingStreamEventHandler streamEventHandler = new
FileCountingStreamEventHandler();
+ StreamSession session =
setupStreamingSessionForTest(streamEventHandler);
+ Collection<OutgoingStream> outgoingStreams =
store.getStreamManager().createOutgoingStreams(session,
+
rangesAtEndpoint,
+
NO_PENDING_REPAIR,
+
PreviewKind.NONE);
+
+ session.addTransferStreams(outgoingStreams);
+ DataOutputStreamPlus out = constructDataOutputStream();
+
+ for (OutgoingStream outgoingStream : outgoingStreams)
+ outgoingStream.write(session, out, MessagingService.VERSION_40);
+
+ int totalNumberOfFiles =
session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
+
+
assertEquals(CassandraOutgoingFile.getComponentManifest(sstable).components().size(),
totalNumberOfFiles);
+ assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
+ }
+
+ private DataOutputStreamPlus constructDataOutputStream()
+ {
+ // This is needed as Netty releases the ByteBuffers as soon as the
channel is flushed
+ ByteBuf serializedFile = Unpooled.buffer(8192);
+ EmbeddedChannel channel = createMockNettyChannel(serializedFile);
+ return new AsyncStreamingOutputPlus(channel)
+ {
+ public void flush() throws IOException
+ {
+ // NO-OP
+ }
+ };
+ }
+
+ private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile)
+ {
+ WritableByteChannel wbc = new WritableByteChannel()
+ {
+ private boolean isOpen = true;
+
+ public int write(ByteBuffer src)
+ {
+ int size = src.limit();
+ serializedFile.writeBytes(src);
+ return size;
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen;
+ }
+
+ public void close()
+ {
+ isOpen = false;
+ }
+ };
+
+ return new EmbeddedChannel(new ChannelOutboundHandlerAdapter()
+ {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception
+ {
+ ((SharedDefaultFileRegion) msg).transferTo(wbc, 0);
+ super.write(ctx, msg, promise);
+ }
+ });
+ }
+
+
+ private StreamSession setupStreamingSessionForTest(StreamEventHandler
streamEventHandler)
+ {
+ StreamCoordinator streamCoordinator = new
StreamCoordinator(StreamOperation.BOOTSTRAP,
+ 1,
+ new
DefaultConnectionFactory(),
+ false,
+ null,
+
PreviewKind.NONE);
+
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(),
+
StreamOperation.BOOTSTRAP,
+
Collections.singleton(streamEventHandler),
+ streamCoordinator);
+
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ streamCoordinator.addSessionInfo(new SessionInfo(peer,
+ 0,
+ peer,
+
Collections.emptyList(),
+
Collections.emptyList(),
+
StreamSession.State.INITIALIZED));
+
+ StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+ session.init(future);
+
+ return session;
+ }
+
+ private static final class FileCountingStreamEventHandler implements
StreamEventHandler
+ {
+ final Collection<String> fileNames = new ArrayList<>();
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (event.eventType == StreamEvent.Type.FILE_PROGRESS && event
instanceof StreamEvent.ProgressEvent)
+ {
+ StreamEvent.ProgressEvent progressEvent =
((StreamEvent.ProgressEvent) event);
+ fileNames.add(progressEvent.progress.fileName);
+ }
+ }
+
+ public void onSuccess(@Nullable StreamState streamState)
+ {
+ assert streamState != null;
+ assertFalse(streamState.hasFailedSession());
+ }
+
+ public void onFailure(Throwable throwable)
+ {
+ fail();
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 2361125..2f4feff 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -95,7 +95,7 @@ public class StreamTransferTaskTest
ranges.add(new Range<>(sstable.first.getToken(),
sstable.last.getToken()));
task.addTransferStream(new
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(),
sstable.getPositionsForRanges(ranges), ranges, 1));
}
- assertEquals(2, task.getTotalNumberOfFiles());
+ assertEquals(14, task.getTotalNumberOfFiles());
// if file sending completes before timeout then the task should be
canceled.
Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
@@ -147,7 +147,7 @@ public class StreamTransferTaskTest
refs.add(ref);
task.addTransferStream(new
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref,
sstable.getPositionsForRanges(ranges), ranges, 1));
}
- assertEquals(2, task.getTotalNumberOfFiles());
+ assertEquals(14, task.getTotalNumberOfFiles());
//add task to stream session, so it is aborted when stream session
fails
session.transfers.put(TableId.generate(), task);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]